This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e2655cf32 [GOBBLIN-2019] Remove circular dependency & initialize
multi-active execution related classes (#3899)
e2655cf32 is described below
commit e2655cf326eac1f5401e3b65c703efcd68621974
Author: umustafi <[email protected]>
AuthorDate: Fri Mar 29 13:15:16 2024 -0700
[GOBBLIN-2019] Remove circular dependency & initialize multi-active
execution related classes (#3899)
* Remove circular dependency & initialize executon related classes
* Remove default table names and fix compile bug
* Add test for getClass method
* Improve comments & javadocs
* Fix MysqlMultiActiveLeaseArbiterTest
* Create abstract factory class and update DagProc fields
* Refactor dagProc methods and use base class for mock in tests
* Use lease arbiter in single-active execution mode
* Delete dagAction before concluding lease
* Rename lease arbiter
* Simplify gobblinServiceManagerTest
* Update gobblinServiceManager create method
---
.../gobblin/configuration/ConfigurationKeys.java | 12 +-
.../src/main/avro/DagActionStoreChangeEvent.avsc | 5 +
.../runtime/DagActionStoreChangeMonitorTest.java | 3 +-
.../modules/core/GobblinServiceGuiceModule.java | 37 ++--
.../modules/core/GobblinServiceManager.java | 34 ++--
...nProcessingMultiActiveLeaseArbiterFactory.java} | 26 +--
.../orchestration/DagActionReminderScheduler.java | 103 +++++++++--
.../modules/orchestration/DagActionStore.java | 5 +-
.../orchestration/DagManagementTaskStreamImpl.java | 106 ++++++++---
.../service/modules/orchestration/DagManager.java | 6 +-
.../modules/orchestration/FlowLaunchHandler.java | 6 +-
... FlowLaunchMultiActiveLeaseArbiterFactory.java} | 26 +--
.../MostlyMySqlDagManagementStateStore.java | 2 +
.../MultiActiveLeaseArbiterFactory.java | 57 ++++++
.../modules/orchestration/MysqlDagActionStore.java | 7 +-
.../MysqlMultiActiveLeaseArbiter.java | 19 +-
.../modules/orchestration/Orchestrator.java | 2 +-
.../ReminderSettingDagProcLeaseArbiter.java | 197 ---------------------
.../modules/orchestration/proc/DagProc.java | 14 +-
.../modules/orchestration/proc/LaunchDagProc.java | 18 +-
.../modules/orchestration/task/DagTask.java | 12 +-
.../modules/orchestration/task/LaunchDagTask.java | 5 +-
...lowExecutionResourceHandlerWithWarmStandby.java | 2 +-
.../monitoring/DagActionStoreChangeMonitor.java | 7 +-
.../gobblin/service/GobblinServiceManagerTest.java | 14 ++
.../DagActionReminderSchedulerTest.java | 73 ++++++++
.../DagManagementTaskStreamImplTest.java | 12 +-
.../modules/orchestration/DagManagerFlowTest.java | 14 +-
.../orchestration/DagProcessingEngineTest.java | 14 +-
.../orchestration/MysqlDagActionStoreTest.java | 8 +-
.../MysqlMultiActiveLeaseArbiterTest.java | 8 +-
.../orchestration/proc/LaunchDagProcTest.java | 6 +-
32 files changed, 497 insertions(+), 363 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index d7204df8f..6f4b100b1 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -100,13 +100,13 @@ public class ConfigurationKeys {
public static final String
MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY =
MYSQL_DAG_ACTION_STORE_PREFIX + "retentionPeriodSeconds";
public static final long
DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 *
60; // (3 days in seconds)
// Scheduler lease determination store configuration
+ public static final String SCHEDULER_LEASE_ARBITER_NAME =
"SchedulerFlowLaunchLeaseArbiter";
+ public static final String PROCESSING_LEASE_ARBITER_NAME =
"DagActionProcessingLeaseArbiter";
public static final String MYSQL_LEASE_ARBITER_PREFIX =
"MysqlMultiActiveLeaseArbiter";
- public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
- public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "gobblin_multi_active_scheduler_constants_store";
- public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiter.store.db.table";
- public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"gobblin_scheduler_lease_determination_store";
- public static final String
SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".retentionPeriodMillis";
- public static final long
DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS = 3 * 24 *
60 * 60 * 1000; // (3 days in ms)
+ public static final String MULTI_ACTIVE_CONSTANTS_DB_TABLE_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
+ public static final String LEASE_DETERMINATION_STORE_DB_TABLE_KEY =
MYSQL_LEASE_ARBITER_PREFIX + "." + STATE_STORE_DB_TABLE_KEY;
+ public static final String
LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".retentionPeriodMillis";
+ public static final long
DEFAULT_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS = 3 * 24 * 60 * 60 *
1000; // (3 days in ms)
// Refers to the event we originally tried to acquire a lease which achieved
`consensus` among participants through
// the database
public static final String
SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY =
"preservedConsensusEventTimeMillis";
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
index b628f1714..60eeb154a 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
@@ -23,6 +23,11 @@
"type" : "string",
"doc" : "flow execution id for the dag action",
"compliance" : "NONE"
+ }, {
+ "name" : "jobName",
+ "type" : "string",
+ "doc" : "job name for the dag action (empty string if not applicable)",
+ "compliance" : "NONE"
}, {
"name" : "dagAction",
"type": {
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 71c97f0e5..78083c065 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -206,7 +206,8 @@ public class DagActionStoreChangeMonitorTest {
GenericStoreChangeEvent genericStoreChangeEvent =
new GenericStoreChangeEvent(key, String.valueOf(txidCounter),
System.currentTimeMillis(), operationType);
txidCounter++;
- return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup,
flowName, flowExecutionId, dagAction);
+ return new DagActionStoreChangeEvent(genericStoreChangeEvent, flowGroup,
flowName, flowExecutionId,
+ DagActionStore.NO_JOB_NAME_DEFAULT, dagAction);
}
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index c01ff3e62..ed179fa58 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -37,11 +37,13 @@ import com.typesafe.config.Config;
import javax.inject.Singleton;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.restli.EmbeddedRestliServer;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.DagActionProcessingMultiActiveLeaseArbiterFactory;
+import
org.apache.gobblin.service.modules.orchestration.FlowLaunchMultiActiveLeaseArbiterFactory;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
-import
org.apache.gobblin.service.modules.orchestration.MysqlMultiActiveLeaseArbiter;
import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -77,7 +79,6 @@ import
org.apache.gobblin.service.modules.orchestration.DagProcessingEngine;
import org.apache.gobblin.service.modules.orchestration.DagTaskStream;
import
org.apache.gobblin.service.modules.orchestration.MostlyMySqlDagManagementStateStore;
import
org.apache.gobblin.service.modules.orchestration.DagActionReminderScheduler;
-import
org.apache.gobblin.service.modules.orchestration.ReminderSettingDagProcLeaseArbiter;
import org.apache.gobblin.service.modules.orchestration.FlowLaunchHandler;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
@@ -108,8 +109,6 @@ import org.apache.gobblin.util.ConfigUtils;
public class GobblinServiceGuiceModule implements Module {
- public static final String SCHEDULER_LEASE_ARBITER_NAME =
"SchedulerLeaseArbiter";
- public static final String EXECUTOR_LEASE_ARBITER_NAME =
"ExecutorLeaseArbiter";
private static final Logger LOGGER =
LoggerFactory.getLogger(GobblinServiceGuiceModule.class);
private static final String JOB_STATUS_RETRIEVER_CLASS_KEY =
"jobStatusRetriever.class";
@@ -190,7 +189,9 @@ public class GobblinServiceGuiceModule implements Module {
OptionalBinder.newOptionalBinder(binder, MultiActiveLeaseArbiter.class);
OptionalBinder.newOptionalBinder(binder, FlowLaunchHandler.class);
if (serviceConfig.isMultiActiveSchedulerEnabled()) {
-
binder.bind(MultiActiveLeaseArbiter.class).annotatedWith(Names.named(SCHEDULER_LEASE_ARBITER_NAME)).to(MysqlMultiActiveLeaseArbiter.class);
+ binder.bind(MultiActiveLeaseArbiter.class).annotatedWith(Names.named(
+ ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME)).toProvider(
+ FlowLaunchMultiActiveLeaseArbiterFactory.class);
binder.bind(FlowLaunchHandler.class);
}
@@ -202,21 +203,25 @@ public class GobblinServiceGuiceModule implements Module {
OptionalBinder.newOptionalBinder(binder, DagManagementStateStore.class);
OptionalBinder.newOptionalBinder(binder, DagProcFactory.class);
OptionalBinder.newOptionalBinder(binder, DagProcessingEngine.class);
+ OptionalBinder.newOptionalBinder(binder, DagActionReminderScheduler.class);
if (serviceConfig.isDagProcessingEngineEnabled()) {
- binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
- binder.bind(DagTaskStream.class).to(DagManagementTaskStreamImpl.class);
-
binder.bind(DagManagementStateStore.class).to(MostlyMySqlDagManagementStateStore.class).in(Singleton.class);
- binder.bind(DagProcFactory.class);
- binder.bind(DagProcessingEngine.class);
-
+ /* Note that two instances of the same class can only be differentiated
with an `annotatedWith` marker provided at
+ binding time (optionally bound classes cannot have names associated with
them). Unlike, the scheduler lease arbiter,
+ the execution lease arbiter is used in single-active or multi-active
execution. */
+ binder.bind(MultiActiveLeaseArbiter.class).
+
annotatedWith(Names.named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME))
+ .toProvider(
+ DagActionProcessingMultiActiveLeaseArbiterFactory.class);
// Multi-active execution is only compatible with dagProcessingEngine
configuration
- OptionalBinder.newOptionalBinder(binder,
ReminderSettingDagProcLeaseArbiter.class);
- OptionalBinder.newOptionalBinder(binder,
DagActionReminderScheduler.class);
if (serviceConfig.isMultiActiveExecutionEnabled()) {
-
binder.bind(MultiActiveLeaseArbiter.class).annotatedWith(Names.named(EXECUTOR_LEASE_ARBITER_NAME)).to(MysqlMultiActiveLeaseArbiter.class);
binder.bind(DagActionReminderScheduler.class);
- binder.bind(ReminderSettingDagProcLeaseArbiter.class);
}
+
+ binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class);
+ binder.bind(DagTaskStream.class).to(DagManagementTaskStreamImpl.class);
+
binder.bind(DagManagementStateStore.class).to(MostlyMySqlDagManagementStateStore.class);
+ binder.bind(DagProcFactory.class);
+ binder.bind(DagProcessingEngine.class);
}
binder.bind(FlowConfigsResource.class);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index e65d65f38..a5e8f102e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -116,6 +116,7 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
public static final String SERVICE_EVENT_BUS_NAME =
"GobblinServiceManagerEventBus";
private static final Logger LOGGER =
LoggerFactory.getLogger(GobblinServiceManager.class);
+ private static volatile GobblinServiceGuiceModule
GOBBLIN_SERVICE_GUICE_MODULE;
protected final ServiceBasedAppLauncher serviceLauncher;
private volatile boolean stopInProgress = false;
@@ -250,11 +251,28 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
return create(new GobblinServiceConfiguration(serviceName, serviceId,
config, serviceWorkDir));
}
+ /**
+ * Uses the provided serviceConfiguration to create a new Guice module and
obtain a new class associated with it.
+ * This method should only be called once per application.
+ */
public static GobblinServiceManager create(GobblinServiceConfiguration
serviceConfiguration) {
- GobblinServiceGuiceModule guiceModule = new
GobblinServiceGuiceModule(serviceConfiguration);
+ GOBBLIN_SERVICE_GUICE_MODULE = new
GobblinServiceGuiceModule(serviceConfiguration);
+ return getClass(GobblinServiceManager.class);
+ }
- Injector injector = Guice.createInjector(Stage.PRODUCTION, guiceModule);
- return injector.getInstance(GobblinServiceManager.class);
+ /**
+ *
+ * @param classToGet
+ * @return a new object if the class type is not marked with @Singleton,
otherwise the same instance of the class
+ * @param <T>
+ */
+ public static <T> T getClass(Class<T> classToGet) {
+ if (GOBBLIN_SERVICE_GUICE_MODULE == null) {
+ throw new RuntimeException(String.format("getClass called to obtain %s
without calling create method to "
+ + "initialize GobblinServiceGuiceModule.", classToGet));
+ }
+ Injector injector = Guice.createInjector(Stage.PRODUCTION,
GOBBLIN_SERVICE_GUICE_MODULE);
+ return injector.getInstance(classToGet);
}
public URI getRestLiServerListeningURI() {
@@ -633,14 +651,8 @@ public class GobblinServiceManager implements
ApplicationLauncher, StandardMetri
Config config = ConfigFactory.load();
- GobblinServiceConfiguration serviceConfiguration =
- new
GobblinServiceConfiguration(cmd.getOptionValue(SERVICE_NAME_OPTION_NAME),
getServiceId(cmd), config,
- null);
-
- GobblinServiceGuiceModule guiceModule = new
GobblinServiceGuiceModule(serviceConfiguration);
- Injector injector = Guice.createInjector(guiceModule);
-
- try (GobblinServiceManager gobblinServiceManager =
injector.getInstance(GobblinServiceManager.class)) {
+ try (GobblinServiceManager gobblinServiceManager =
+ create(cmd.getOptionValue(SERVICE_NAME_OPTION_NAME),
getServiceId(cmd), config, null)) {
gobblinServiceManager.start();
if (isTestMode) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionProcessingMultiActiveLeaseArbiterFactory.java
similarity index 55%
copy from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
copy to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionProcessingMultiActiveLeaseArbiterFactory.java
index 98ede576b..2ca52b137 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionProcessingMultiActiveLeaseArbiterFactory.java
@@ -15,23 +15,25 @@
* limitations under the License.
*/
-package org.apache.gobblin.service.modules.orchestration.task;
+package org.apache.gobblin.service.modules.orchestration;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
-import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
-import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
/**
- * A {@link DagTask} responsible to handle launch tasks.
+ * A factory implementation that returns a {@link MultiActiveLeaseArbiter}
instance used by the
+ * {@link DagManagementTaskStreamImpl} in multi-active execution mode
*/
+@Slf4j
+public class DagActionProcessingMultiActiveLeaseArbiterFactory extends
MultiActiveLeaseArbiterFactory {
-public class LaunchDagTask extends DagTask {
- public LaunchDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
- super(dagAction, leaseObtainedStatus);
- }
-
- public <T> T host(DagTaskVisitor<T> visitor) {
- return visitor.meet(this);
+ @Inject
+ public DagActionProcessingMultiActiveLeaseArbiterFactory(Config config) {
+ super(config, ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index ddcd1fa79..9334f796b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -17,15 +17,26 @@
package org.apache.gobblin.service.modules.orchestration;
-import java.util.Optional;
+import java.io.IOException;
+import java.util.Date;
+import java.util.function.Supplier;
+import org.quartz.Job;
+import org.quartz.JobBuilder;
+import org.quartz.JobDataMap;
import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
/**
@@ -36,14 +47,12 @@ import javax.inject.Inject;
public class DagActionReminderScheduler {
public static final String DAG_ACTION_REMINDER_SCHEDULER_KEY =
"DagActionReminderScheduler";
private final Scheduler quartzScheduler;
- private final Optional<DagManagement> dagManagement;
@Inject
- public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory,
Optional<DagManagement> dagManagement)
+ public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory)
throws SchedulerException {
// Create a new Scheduler to be used solely for the DagProc reminders
this.quartzScheduler =
schedulerFactory.getScheduler(DAG_ACTION_REMINDER_SCHEDULER_KEY);
- this.dagManagement = dagManagement;
}
/**
@@ -55,20 +64,88 @@ public class DagActionReminderScheduler {
*/
public void scheduleReminder(DagActionStore.DagAction dagAction, long
reminderDurationMillis)
throws SchedulerException {
- if (!dagManagement.isPresent()) {
- throw new RuntimeException("DagManagement not initialized in
multi-active execution mode when required.");
- }
- JobDetail jobDetail =
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(),
dagAction);
- Trigger trigger =
ReminderSettingDagProcLeaseArbiter.createReminderJobTrigger(dagAction,
reminderDurationMillis);
+ JobDetail jobDetail = createReminderJobDetail(dagAction);
+ Trigger trigger = createReminderJobTrigger(dagAction,
reminderDurationMillis, System::currentTimeMillis);
quartzScheduler.scheduleJob(jobDetail, trigger);
}
public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws
SchedulerException {
- if (!dagManagement.isPresent()) {
- throw new RuntimeException("DagManagement not initialized in
multi-active execution mode when required.");
- }
- JobDetail jobDetail =
ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(),
dagAction);
+ JobDetail jobDetail = createReminderJobDetail(dagAction);
quartzScheduler.deleteJob(jobDetail.getKey());
}
+ /**
+ * Static class used to store information regarding a pending dagAction that
needs to be revisited at a later time
+ * by {@link DagManagement} interface to re-attempt a lease on if it has not
been completed by the previous owner.
+ * These jobs are scheduled and used by the {@link
DagActionReminderScheduler}.
+ */
+ @Slf4j
+ public static class ReminderJob implements Job {
+ public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
+
+ @Override
+ public void execute(JobExecutionContext context) {
+ // Get properties from the trigger to create a dagAction
+ JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
+ String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
+ String flowGroup =
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+ String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
+ String flowId =
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
+ DagActionStore.DagActionType dagActionType =
DagActionStore.DagActionType.valueOf(
+ jobDataMap.getString(FLOW_ACTION_TYPE_KEY));
+
+ log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ",
flowName: " + flowName
+ + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
+
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
+ dagActionType);
+
+ try {
+ DagManagement dagManagement =
GobblinServiceManager.getClass(DagManagement.class);
+ dagManagement.addDagAction(dagAction);
+ } catch (IOException e) {
+ log.error("Failed to add DagAction to DagManagement. Action: {}",
dagAction);
+ }
+ }
+ }
+
+ /**
+ * Creates a key for the reminder job by concatenating all dagAction fields
+ */
+ public static String createDagActionReminderKey(DagActionStore.DagAction
dagAction) {
+ return String.format("%s.%s.%s.%s.%s", dagAction.getFlowGroup(),
dagAction.getFlowName(),
+ dagAction.getFlowExecutionId(), dagAction.getJobName(),
dagAction.getDagActionType());
+ }
+
+ /**
+ * Creates a jobDetail containing flow and job identifying information in
the jobDataMap, uniquely identified
+ * by a key comprised of the dagAction's fields.
+ */
+ public static JobDetail createReminderJobDetail(DagActionStore.DagAction
dagAction) {
+ JobDataMap dataMap = new JobDataMap();
+ dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagAction.getFlowName());
+ dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagAction.getFlowGroup());
+ dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagAction.getJobName());
+ dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
+ dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY,
dagAction.getDagActionType());
+
+ return JobBuilder.newJob(ReminderJob.class)
+ .withIdentity(createDagActionReminderKey(dagAction),
dagAction.getFlowGroup())
+ .usingJobData(dataMap)
+ .build();
+ }
+
+ /**
+ * Creates a Trigger object with the same key as the ReminderJob (since only
one trigger is expected to be associated
+ * with a job at any given time) that should fire after
`reminderDurationMillis` millis. It uses
+ * `getCurrentTimeMillis` to determine the current time.
+ */
+ public static Trigger createReminderJobTrigger(DagActionStore.DagAction
dagAction, long reminderDurationMillis,
+ Supplier<Long> getCurrentTimeMillis) {
+ Trigger trigger = TriggerBuilder.newTrigger()
+ .withIdentity(createDagActionReminderKey(dagAction),
dagAction.getFlowGroup())
+ .startAt(new Date(getCurrentTimeMillis.get() + reminderDurationMillis))
+ .build();
+ return trigger;
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 3076486f7..94beebd16 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -28,6 +28,7 @@ import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
public interface DagActionStore {
+ public static final String NO_JOB_NAME_DEFAULT = "";
enum DagActionType {
KILL, // Kill invoked through API call
RESUME, // Resume flow invoked through API call
@@ -98,7 +99,7 @@ public interface DagActionStore {
* @param dagActionType the value of the dag action
* @throws IOException
*/
- void addDagAction(String flowGroup, String flowName, String flowExecutionId,
String jobName, DagActionType dagActionType) throws IOException;
+ void addJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName, DagActionType dagActionType) throws
IOException;
/**
* Persist the dag action in {@link DagActionStore} for durability. This
method assumes an empty jobName.
@@ -108,7 +109,7 @@ public interface DagActionStore {
* @param dagActionType the value of the dag action
* @throws IOException
*/
- void addDagAction(String flowGroup, String flowName, String flowExecutionId,
DagActionType dagActionType) throws IOException;
+ void addFlowDagAction(String flowGroup, String flowName, String
flowExecutionId, DagActionType dagActionType) throws IOException;
/**
* delete the dag action from {@link DagActionStore}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index a6778c82f..16d396394 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -17,22 +17,28 @@
package org.apache.gobblin.service.modules.orchestration;
+import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import org.quartz.SchedulerException;
+
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import javax.inject.Named;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.util.ConfigUtils;
@@ -40,7 +46,20 @@ import org.apache.gobblin.util.ConfigUtils;
/**
* DagManagementTaskStreamImpl implements {@link DagManagement} and {@link
DagTaskStream}. It accepts
- * {@link DagActionStore.DagAction}s and iteratively provides {@link DagTask}.
+ * {@link
org.apache.gobblin.service.modules.orchestration.DagActionStore.DagAction}s and
iteratively provides
+ * {@link DagTask}.
+ *
+ * It uses {@link MultiActiveLeaseArbiter} to coordinate multiple hosts with
execution components enabled in
+ * multi-active execution mode to respond to flow action events by attempting
ownership over a flow action event at a
+ * given event time. Only events that the current instance acquires a lease
for are selected by
+ * {@link DagManagementTaskStreamImpl#next()}. If the status of the lease
ownership attempt is anything other than an
+ * indication the lease has been completed
+ * ({@link LeaseAttemptStatus}) then the {@link
MultiActiveLeaseArbiter#tryAcquireLease} method will set a reminder for
+ * the flow action using {@link DagActionReminderScheduler} to reattempt the
lease after the current leaseholder's grant
+ * would have expired.
+ * Note that if multi-active execution is NOT enabled, then all flow action
events are selected by
+ * {@link DagManagementTaskStreamImpl#next()} by virtue of having no other
contenders for the lease at the time
+ * {@link MultiActiveLeaseArbiter#tryAcquireLease} is called.
*/
@Slf4j
@Singleton
@@ -51,20 +70,34 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
@Inject(optional=true)
protected Optional<DagActionStore> dagActionStore;
- protected Optional<ReminderSettingDagProcLeaseArbiter>
reminderSettingDagProcLeaseArbiter;
+ protected MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter;
+ protected Optional<DagActionReminderScheduler> dagActionReminderScheduler;
+ private final boolean isMultiActiveExecutionEnabled;
@Inject
private static final int MAX_HOUSEKEEPING_THREAD_DELAY = 180;
private final BlockingQueue<DagActionStore.DagAction> dagActionQueue = new
LinkedBlockingQueue<>();
- // TODO: need to pass reference to DagProcLeaseArbiter without creating a
circular reference in Guice
@Inject
public DagManagementTaskStreamImpl(Config config, Optional<DagActionStore>
dagActionStore,
- Optional<ReminderSettingDagProcLeaseArbiter>
reminderSettingDagProcLeaseArbiter) {
+ @Named(ConfigurationKeys.PROCESSING_LEASE_ARBITER_NAME)
MultiActiveLeaseArbiter dagActionProcessingLeaseArbiter,
+ Optional<DagActionReminderScheduler> dagActionReminderScheduler,
+ @Named(InjectionNames.MULTI_ACTIVE_EXECUTION_ENABLED) boolean
isMultiActiveExecutionEnabled) {
this.config = config;
+ if (!dagActionStore.isPresent()) {
+ /* DagActionStore is optional because there are other configurations
that do not require it and it's initialized
+ in {@link GobblinServiceGuiceModule} which handles all possible
configurations */
+ throw new RuntimeException("DagProcessingEngine should not be enabled
without dagActionStore enabled.");
+ }
this.dagActionStore = dagActionStore;
- this.reminderSettingDagProcLeaseArbiter =
reminderSettingDagProcLeaseArbiter;
+ this.dagActionProcessingLeaseArbiter = dagActionProcessingLeaseArbiter;
+ this.dagActionReminderScheduler = dagActionReminderScheduler;
+ this.isMultiActiveExecutionEnabled = isMultiActiveExecutionEnabled;
MetricContext metricContext =
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()),
getClass());
this.eventSubmitter = new EventSubmitter.Builder(metricContext,
"org.apache.gobblin.service").build();
+ if (this.isMultiActiveExecutionEnabled &&
!this.dagActionReminderScheduler.isPresent()) {
+ throw new RuntimeException(String.format("Multi-active execution enabled
but required "
+ + "instance %s is absent.",
DagActionReminderScheduler.class.getSimpleName()));
+ }
}
@Override
@@ -84,26 +117,42 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
@Override
public DagTask next() {
- /*TODO: this requires use of lease arbiter, in single-active execution
lease arbiter will not be present and we can
- provide a dummy LeaseObtainedStatus or create alternate route
- */
- if (!this.reminderSettingDagProcLeaseArbiter.isPresent()) {
- throw new RuntimeException("DagManagement not initialized in
multi-active execution mode when required.");
- }
- try {
- LeaseAttemptStatus leaseAttemptStatus = null;
- DagActionStore.DagAction dagAction = null;
- while (!(leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus)) {
- dagAction = this.dagActionQueue.take(); //`take` blocks till element
is not available
- // TODO: need to handle reminder events and flag them
- leaseAttemptStatus =
this.reminderSettingDagProcLeaseArbiter.get().tryAcquireLease(dagAction,
System.currentTimeMillis(), false, false);
+ while (true) {
+ try {
+ DagActionStore.DagAction dagAction = this.dagActionQueue.take();
+ LeaseAttemptStatus leaseAttemptStatus =
retrieveLeaseStatus(dagAction);
+ if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus) {
+ return createDagTask(dagAction,
(LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus);
+ }
+ } catch (Exception e) {
+ //TODO: need to handle exceptions gracefully
+ log.error("Exception getting DagAction from the queue / creating
DagTask", e);
+ }
}
- return createDagTask(dagAction, (LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttemptStatus);
- } catch (Throwable t) {
- //TODO: need to handle exceptions gracefully
- log.error("Error getting DagAction from the queue / creating DagTask",
t);
+ }
+
+ /**
+ * Returns a {@link LeaseAttemptStatus} associated with the
+ * `dagAction` by calling
+ * {@link MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagAction,
long, boolean, boolean)}.
+ * @param dagAction
+ * @return
+ * @throws IOException
+ * @throws SchedulerException
+ */
+ private LeaseAttemptStatus retrieveLeaseStatus(DagActionStore.DagAction
dagAction)
+ throws IOException, SchedulerException {
+ LeaseAttemptStatus leaseAttemptStatus;
+ // TODO: need to handle reminder events and flag them
+ leaseAttemptStatus = this.dagActionProcessingLeaseArbiter
+ .tryAcquireLease(dagAction, System.currentTimeMillis(), false, false);
+ /* Schedule a reminder for the event unless the lease has been
completed to safeguard against the case where even
+ we, when we might become the lease owner still fail to complete
processing
+ */
+ if (!(leaseAttemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus)) {
+ scheduleReminderForEvent(leaseAttemptStatus);
}
- return null;
+ return leaseAttemptStatus;
}
private DagTask createDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
@@ -111,9 +160,18 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
switch (dagActionType) {
case LAUNCH:
- return new LaunchDagTask(dagAction, leaseObtainedStatus);
+ return new LaunchDagTask(dagAction, leaseObtainedStatus,
dagActionStore.get());
default:
throw new UnsupportedOperationException("Not yet implemented");
}
}
+
+ /* Schedules a reminder for the flow action using {@link
DagActionReminderScheduler} to reattempt the lease after the
+ current leaseholder's grant would have expired.
+ */
+ protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
+ throws SchedulerException {
+
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getDagAction(),
+ leaseStatus.getMinimumLingerDurationMillis());
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 6ce85cfd3..7fd90a244 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -86,6 +86,7 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import static org.apache.gobblin.service.ExecutionStatus.*;
+import static
org.apache.gobblin.service.modules.orchestration.DagActionStore.NO_JOB_NAME_DEFAULT;
/**
@@ -183,8 +184,7 @@ public class DagManager extends AbstractIdleService {
}
DagActionStore.DagAction toDagAction(DagActionStore.DagActionType
actionType) {
- // defaults to empty jobName
- return new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "", actionType);
+ return new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, NO_JOB_NAME_DEFAULT, actionType);
}
}
@@ -591,7 +591,7 @@ public class DagManager extends AbstractIdleService {
private void removeDagActionFromStore(DagId dagId,
DagActionStore.DagActionType dagActionType) throws IOException {
if (this.dagActionStore.isPresent()) {
this.dagActionStore.get().deleteDagAction(
- new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName,
dagId.flowExecutionId, "", dagActionType));
+ new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName,
dagId.flowExecutionId, NO_JOB_NAME_DEFAULT, dagActionType));
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index b407c445c..6f6d920d4 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -50,7 +50,6 @@ import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
-import org.apache.gobblin.service.modules.core.GobblinServiceGuiceModule;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.util.ConfigUtils;
@@ -79,7 +78,8 @@ public class FlowLaunchHandler {
private ContextAwareCounter failedToSetEventReminderCount;
@Inject
- public FlowLaunchHandler(Config config,
@Named(GobblinServiceGuiceModule.SCHEDULER_LEASE_ARBITER_NAME)
Optional<MultiActiveLeaseArbiter> leaseArbiter,
+ public FlowLaunchHandler(Config config,
+ @Named(ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME)
Optional<MultiActiveLeaseArbiter> leaseArbiter,
SchedulerService schedulerService, Optional<DagActionStore>
dagActionStore) {
this.multiActiveLeaseArbiter = leaseArbiter;
this.dagActionStore = dagActionStore;
@@ -140,7 +140,7 @@ public class FlowLaunchHandler {
if (this.dagActionStore.isPresent() &&
this.multiActiveLeaseArbiter.isPresent()) {
try {
DagActionStore.DagAction dagAction = leaseStatus.getDagAction();
- this.dagActionStore.get().addDagAction(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId(),
dagAction.getDagActionType());
+ this.dagActionStore.get().addFlowDagAction(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId(),
dagAction.getDagActionType());
// If the dag action has been persisted to the {@link DagActionStore}
we can close the lease
this.numFlowsSubmitted.mark();
return
this.multiActiveLeaseArbiter.get().recordLeaseSuccess(leaseStatus);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchMultiActiveLeaseArbiterFactory.java
similarity index 56%
copy from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
copy to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchMultiActiveLeaseArbiterFactory.java
index 98ede576b..86b576467 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchMultiActiveLeaseArbiterFactory.java
@@ -15,23 +15,25 @@
* limitations under the License.
*/
-package org.apache.gobblin.service.modules.orchestration.task;
+package org.apache.gobblin.service.modules.orchestration;
-import org.apache.gobblin.service.modules.orchestration.DagActionStore;
-import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
-import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
/**
- * A {@link DagTask} responsible to handle launch tasks.
+ * A factory implementation that returns a {@link MultiActiveLeaseArbiter}
instance used by the
+ * {@link FlowLaunchHandler} in multi-active scheduler mode
*/
+@Slf4j
+public class FlowLaunchMultiActiveLeaseArbiterFactory extends
MultiActiveLeaseArbiterFactory {
-public class LaunchDagTask extends DagTask {
- public LaunchDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
- super(dagAction, leaseObtainedStatus);
- }
-
- public <T> T host(DagTaskVisitor<T> visitor) {
- return visitor.meet(this);
+ @Inject
+ public FlowLaunchMultiActiveLeaseArbiterFactory(Config config) {
+ super(config, ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
index 7ddfdf616..029dd35b8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MostlyMySqlDagManagementStateStore.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.typesafe.config.Config;
+import javax.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -60,6 +61,7 @@ import
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
* They are used here to provide complete access to dag related information at
one place.
*/
@Slf4j
+@Singleton
public class MostlyMySqlDagManagementStateStore implements
DagManagementStateStore {
private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>
jobToDag = new ConcurrentHashMap<>();
private final Map<DagNodeId, Dag.DagNode<JobExecutionPlan>> dagNodes = new
ConcurrentHashMap<>();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiterFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiterFactory.java
new file mode 100644
index 000000000..64c353371
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiterFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Objects;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Provider;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An abstract base class for {@link MultiActiveLeaseArbiter} factories that
use a specific configuration key.
+ * Subclasses must provide a key to use in the constructor.
+ */
+@Slf4j
+public abstract class MultiActiveLeaseArbiterFactory implements
Provider<MultiActiveLeaseArbiter> {
+ private final Config leaseArbiterConfig;
+ private final String configPrefix;
+
+ public MultiActiveLeaseArbiterFactory(Config config, String configPrefix) {
+ Objects.requireNonNull(config);
+ this.configPrefix = Objects.requireNonNull(configPrefix);
+ if (!config.hasPath(configPrefix)) {
+ throw new RuntimeException(String.format("Unable to initialize
multiActiveLeaseArbiter due to missing "
+ + "configurations that should be prefixed by %s.", configPrefix));
+ }
+ this.leaseArbiterConfig = config.getConfig(configPrefix);
+ log.info("Lease arbiter will be initialized with config {}",
leaseArbiterConfig);
+ }
+
+ @Override
+ public MultiActiveLeaseArbiter get() {
+ try {
+ return new InstrumentedLeaseArbiter(this.leaseArbiterConfig, new
MysqlMultiActiveLeaseArbiter(leaseArbiterConfig), configPrefix);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to initialize " + configPrefix + "
lease arbiter due to ", e);
+ }
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index 1481f097e..f0b93e059 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -45,7 +45,6 @@ import org.apache.gobblin.util.ExponentialBackoff;
public class MysqlDagActionStore implements DagActionStore {
public static final String CONFIG_PREFIX = "MysqlDagActionStore";
- public static final String NO_JOB_NAME_DEFAULT = "";
protected final DataSource dataSource;
private final DBStatementExecutor dbStatementExecutor;
@@ -128,7 +127,7 @@ public class MysqlDagActionStore implements DagActionStore {
}
@Override
- public void addDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName, DagActionType dagActionType)
+ public void addJobDagAction(String flowGroup, String flowName, String
flowExecutionId, String jobName, DagActionType dagActionType)
throws IOException {
dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT,
tableName), insertStatement -> {
try {
@@ -146,9 +145,9 @@ public class MysqlDagActionStore implements DagActionStore {
}
@Override
- public void addDagAction(String flowGroup, String flowName, String
flowExecutionId, DagActionType dagActionType)
+ public void addFlowDagAction(String flowGroup, String flowName, String
flowExecutionId, DagActionType dagActionType)
throws IOException {
- addDagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT,
dagActionType);
+ addJobDagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT,
dagActionType);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 03142ebbc..9dfea5acc 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -29,7 +29,6 @@ import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
-import com.google.inject.Inject;
import com.typesafe.config.Config;
import javax.sql.DataSource;
@@ -175,7 +174,6 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
private static final ThreadLocal<Calendar> UTC_CAL =
ThreadLocal.withInitial(() ->
Calendar.getInstance(TimeZone.getTimeZone("UTC")));
- @Inject
public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
if (config.hasPath(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX)) {
config =
config.getConfig(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX).withFallback(config);
@@ -184,17 +182,20 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
+ "before all properties",
ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX));
}
- // TODO: create two tables or take in table name as parameter to have
scheduler and executor table
- this.leaseArbiterTableName = ConfigUtils.getString(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
-
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
- this.constantsTableName = ConfigUtils.getString(config,
ConfigurationKeys.MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY,
- ConfigurationKeys.DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE);
+ if
(!config.hasPath(ConfigurationKeys.LEASE_DETERMINATION_STORE_DB_TABLE_KEY)
+ ||
!config.hasPath(ConfigurationKeys.MULTI_ACTIVE_CONSTANTS_DB_TABLE_KEY)) {
+ throw new RuntimeException(String.format("Table names %s and %s are
required to be configured so multiple instances do not "
+ + "utilize the same table name",
ConfigurationKeys.LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+ ConfigurationKeys.MULTI_ACTIVE_CONSTANTS_DB_TABLE_KEY));
+ }
+ this.leaseArbiterTableName =
config.getString(ConfigurationKeys.LEASE_DETERMINATION_STORE_DB_TABLE_KEY);
+ this.constantsTableName =
config.getString(ConfigurationKeys.MULTI_ACTIVE_CONSTANTS_DB_TABLE_KEY);
this.epsilonMillis = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
this.lingerMillis = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
- this.retentionPeriodMillis = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY,
-
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS);
+ this.retentionPeriodMillis = ConfigUtils.getLong(config,
ConfigurationKeys.LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY,
+
ConfigurationKeys.DEFAULT_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS);
this.thisTableRetentionStatement =
String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT,
this.leaseArbiterTableName,
retentionPeriodMillis);
this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT,
this.leaseArbiterTableName,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index f4977caa2..7540f31f1 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -234,7 +234,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
DagActionStore.DagAction launchDagAction =
- new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
"", DagActionStore.DagActionType.LAUNCH);
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH);
// If multi-active scheduler is enabled do not pass onto DagManager,
otherwise scheduler forwards it directly
// Skip flow compilation as well, since we recompile after receiving
event from DagActionStoreChangeMonitor later
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
deleted file mode 100644
index 91e7f20b0..000000000
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.service.modules.orchestration;
-
-import java.io.IOException;
-import java.util.Date;
-import java.util.Optional;
-
-import org.quartz.Job;
-import org.quartz.JobBuilder;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.TriggerBuilder;
-
-import com.typesafe.config.Config;
-
-import javax.inject.Inject;
-import javax.inject.Named;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.service.modules.core.GobblinServiceGuiceModule;
-
-
-/**
- * Decorator used to coordinate multiple hosts with execution components
enabled to respond to flow action events with
- * added capabilities to properly handle the result of attempted ownership
over these flow action events. It uses the
- * {@link MultiActiveLeaseArbiter} to determine a single lease owner at a
given event time for a flow action event.
- * If the status of the lease ownership attempt is anything other than an
indication the lease has been completed
- * ({@link LeaseAttemptStatus.NoLongerLeasingStatus}) then the
- * {@link MultiActiveLeaseArbiter#tryAcquireLease} method will set a reminder
for the flow action using
- * {@link DagActionReminderScheduler} to reattempt the lease after the current
lease holder's grant would have expired.
- */
-@Slf4j
-public class ReminderSettingDagProcLeaseArbiter implements
MultiActiveLeaseArbiter {
- private final Optional<MultiActiveLeaseArbiter> decoratedLeaseArbiter;
- private final Optional<DagActionReminderScheduler>
dagActionReminderScheduler;
- private final Config config;
- private final String MISSING_OPTIONAL_ERROR_MESSAGE =
String.format("Multi-active execution is not enabled so dag "
- + "action should not passed to %s",
ReminderSettingDagProcLeaseArbiter.class.getSimpleName());
-
- @Inject
- public ReminderSettingDagProcLeaseArbiter(Config config,
- @Named(GobblinServiceGuiceModule.EXECUTOR_LEASE_ARBITER_NAME)
Optional<MultiActiveLeaseArbiter> leaseArbiter,
- Optional<DagActionReminderScheduler> dagActionReminderScheduler) {
- this.decoratedLeaseArbiter = leaseArbiter;
- this.dagActionReminderScheduler = dagActionReminderScheduler;
- this.config = config;
- }
-
- /**
- * Attempts a lease for a particular job event and sets a reminder to
revisit if the lease has not been completed.
- */
- @Override
- public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
dagAction, long eventTimeMillis,
- boolean isReminderEvent, boolean skipFlowExecutionIdReplacement) {
- if (this.decoratedLeaseArbiter.isPresent()) {
- try {
- LeaseAttemptStatus leaseAttemptStatus =
- this.decoratedLeaseArbiter.get().tryAcquireLease(dagAction,
eventTimeMillis, isReminderEvent,
- skipFlowExecutionIdReplacement);
- /* Schedule a reminder for the event unless the lease has been completed
to safeguard against the case where even
- we, when we might become the lease owner still fail to complete
processing
- */
- if (!(leaseAttemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus)) {
- scheduleReminderForEvent(leaseAttemptStatus);
- }
- return leaseAttemptStatus;
- } catch (SchedulerException | IOException e) {
- throw new RuntimeException(e);
- }
- } else {
- throw new RuntimeException(MISSING_OPTIONAL_ERROR_MESSAGE);
- }
- }
-
- @Override
- public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus
status)
- throws IOException {
- if (!this.decoratedLeaseArbiter.isPresent()) {
- throw new RuntimeException(MISSING_OPTIONAL_ERROR_MESSAGE);
- }
- return this.decoratedLeaseArbiter.get().recordLeaseSuccess(status);
- }
-
- protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
- throws SchedulerException {
- if (!this.dagActionReminderScheduler.isPresent()) {
- throw new RuntimeException(MISSING_OPTIONAL_ERROR_MESSAGE);
- }
-
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getDagAction(),
- leaseStatus.getMinimumLingerDurationMillis());
- }
-
- /**
- * Static class used to store information regarding a pending dagAction that
needs to be revisited at a later time
- * by {@link DagManagement} interface to re-attempt a lease on if it has not
been completed by the previous owner.
- * These jobs are scheduled and used by the {@link
DagActionReminderScheduler}.
- */
- @Slf4j
- public static class ReminderJob implements Job {
- public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType";
- public static final String DAG_MANAGEMENT_KEY = "dag.management";
-
- @Override
- public void execute(JobExecutionContext context) {
- // Get properties from the trigger to create a dagAction
- JobDataMap jobDataMap = context.getTrigger().getJobDataMap();
- String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY);
- String flowGroup =
jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY);
- String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY);
- String flowId =
jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
- DagActionStore.DagActionType dagActionType =
DagActionStore.DagActionType.valueOf(
- jobDataMap.getString(FLOW_ACTION_TYPE_KEY));
- DagManagement dagManagement = (DagManagement)
jobDataMap.get(DAG_MANAGEMENT_KEY);
-
- log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ",
flowName: " + flowName
- + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")");
-
- DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowId, jobName,
- dagActionType);
-
- try {
- dagManagement.addDagAction(dagAction);
- } catch (IOException e) {
- log.error("Failed to add DagAction to DagManagement. Action: {}",
dagAction);
- }
- }
- }
-
- /**
- * Creates a key for the reminder job by concatenating all dagAction fields
- */
- public static String createDagActionReminderKey(DagActionStore.DagAction
dagAction) {
- return createDagActionReminderKey(dagAction.getFlowName(),
dagAction.getFlowGroup(), dagAction.getFlowExecutionId(),
- dagAction.getJobName(), dagAction.getDagActionType());
- }
-
- /**
- * Creates a key for the reminder job by concatenating flowName, flowGroup,
flowExecutionId, jobName, dagActionType
- * in that order
- */
- public static String createDagActionReminderKey(String flowName, String
flowGroup, String flowId, String jobName,
- DagActionStore.DagActionType dagActionType) {
- return String.format("%s.%s.%s.%s.%s", flowGroup, flowName, flowId,
jobName, dagActionType);
- }
-
- /**
- * Creates a jobDetail containing flow and job identifying information in
the jobDataMap, uniquely identified
- * by a key comprised of the dagAction's fields. It also serializes a
reference to the {@link DagManagement} object
- * to be referenced when the trigger fires.
- */
- public static JobDetail createReminderJobDetail(DagManagement dagManagement,
DagActionStore.DagAction dagAction) {
- JobDataMap dataMap = new JobDataMap();
- dataMap.put(ReminderJob.DAG_MANAGEMENT_KEY, dagManagement);
- dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagAction.getFlowName());
- dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagAction.getFlowGroup());
- dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagAction.getJobName());
- dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
- dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY,
dagAction.getDagActionType());
-
- return JobBuilder.newJob(ReminderJob.class)
- .withIdentity(createDagActionReminderKey(dagAction),
dagAction.getFlowName())
- .usingJobData(dataMap)
- .build();
- }
-
- /**
- * Creates a Trigger object with the same key as the ReminderJob (since only
one trigger is expected to be associated
- * with a job at any given time) that should fire after
`reminderDurationMillis` millis.
- */
- public static Trigger createReminderJobTrigger(DagActionStore.DagAction
dagAction, long reminderDurationMillis) {
- Trigger trigger = TriggerBuilder.newTrigger()
- .withIdentity(createDagActionReminderKey(dagAction),
dagAction.getFlowName())
- .startAt(new Date(System.currentTimeMillis() + reminderDurationMillis))
- .build();
- return trigger;
- }
-}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
index 1d9dc950c..6a5943445 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java
@@ -19,6 +19,9 @@ package org.apache.gobblin.service.modules.orchestration.proc;
import java.io.IOException;
+import lombok.AccessLevel;
+import lombok.Data;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
@@ -36,21 +39,25 @@ import
org.apache.gobblin.service.modules.orchestration.task.DagTask;
* actions based on the type of {@link DagTask} and finally submitting an
event to the executor.
*/
@Alpha
+@Data
@Slf4j
public abstract class DagProc<S, T> {
protected static final MetricContext metricContext =
Instrumented.getMetricContext(new State(), DagProc.class);
protected static final EventSubmitter eventSubmitter = new
EventSubmitter.Builder(
metricContext, "org.apache.gobblin.service").build();
+ @Getter(AccessLevel.PROTECTED)
+ private final DagTask dagTask;
public final void process(DagManagementStateStore dagManagementStateStore)
throws IOException {
S state = initialize(dagManagementStateStore); // todo - retry
T result = act(dagManagementStateStore, state); // todo - retry
- commit(dagManagementStateStore, result); // todo - retry
sendNotification(result, eventSubmitter); // todo - retry
log.info("{} successfully concluded actions for dagId : {}",
getClass().getSimpleName(), getDagId());
}
- protected abstract DagManager.DagId getDagId();
+ protected DagManager.DagId getDagId() {
+ return this.dagTask.getDagId();
+ }
protected abstract S initialize(DagManagementStateStore
dagManagementStateStore) throws IOException;
@@ -59,7 +66,4 @@ public abstract class DagProc<S, T> {
protected abstract void sendNotification(T result, EventSubmitter
eventSubmitter) throws IOException;
// todo - commit the modified dags to the persistent store, maybe not
required for InMem dagManagementStateStore
- protected void commit(DagManagementStateStore dagManagementStateStore, T
result) {
-
- }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index ce9f56705..6368d8887 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicLong;
import com.google.common.collect.Maps;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -46,7 +45,6 @@ import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
-import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
@@ -58,28 +56,27 @@ import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
* An implementation for {@link DagProc} that launches a new job.
*/
@Slf4j
-@RequiredArgsConstructor
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>,
Optional<Dag<JobExecutionPlan>>> {
- private final LaunchDagTask launchDagTask;
private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
// todo - this is not orchestration delay and should be renamed. keeping it
the same because DagManager is also using
// the same name
private static final AtomicLong orchestrationDelayCounter = new
AtomicLong(0);
+
+ public LaunchDagProc(LaunchDagTask launchDagTask,
FlowCompilationValidationHelper flowCompilationValidationHelper) {
+ super(launchDagTask);
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
+ }
+
static {
metricContext.register(
metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY,
orchestrationDelayCounter::get));
}
- @Override
- protected DagManager.DagId getDagId() {
- return this.launchDagTask.getDagId();
- }
-
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore
dagManagementStateStore)
throws IOException {
try {
- DagActionStore.DagAction dagAction = this.launchDagTask.getDagAction();
+ DagActionStore.DagAction dagAction = this.getDagTask().getDagAction();
FlowSpec flowSpec = loadFlowSpec(dagManagementStateStore, dagAction);
flowSpec.addProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY,
dagAction.getFlowExecutionId());
return
this.flowCompilationValidationHelper.createExecutionPlanIfValid(flowSpec).toJavaUtil();
@@ -192,6 +189,7 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>, Opti
}
}
+ @Override
protected void sendNotification(Optional<Dag<JobExecutionPlan>> result,
EventSubmitter eventSubmitter) {
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index 237eafdb3..732b9a226 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service.modules.orchestration.task;
import java.io.IOException;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
@@ -38,26 +39,31 @@ import
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
*/
@Alpha
+@Slf4j
public abstract class DagTask {
@Getter public final DagActionStore.DagAction dagAction;
private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
+ private final DagActionStore dagActionStore;
@Getter protected final DagManager.DagId dagId;
- public DagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
+ public DagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
+ DagActionStore dagActionStore) {
this.dagAction = dagAction;
this.leaseObtainedStatus = leaseObtainedStatus;
+ this.dagActionStore = dagActionStore;
this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
}
public abstract <T> T host(DagTaskVisitor<T> visitor);
/**
- * Any cleanup work, e.g. releasing lease if it was acquired earlier, may be
done in this method.
+ * Any cleanup work, including removing the dagAction from the
dagActionStore and completing the lease acquired to
+ * work on this task, is done in this method.
* Returns true if concluding dag task finished successfully otherwise false.
*/
- // todo call it from the right place
public boolean conclude() {
try {
+ this.dagActionStore.deleteDagAction(this.dagAction);
return this.leaseObtainedStatus.completeLease();
} catch (IOException e) {
// TODO: Decide appropriate exception to throw and add to the commit
method's signature
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
index 98ede576b..88a620dda 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
@@ -27,8 +27,9 @@ import
org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
*/
public class LaunchDagTask extends DagTask {
- public LaunchDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
- super(dagAction, leaseObtainedStatus);
+ public LaunchDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus,
+ DagActionStore dagActionStore) {
+ super(dagAction, leaseObtainedStatus, dagActionStore);
}
public <T> T host(DagTaskVisitor<T> visitor) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index 650938971..f792ace15 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -70,7 +70,7 @@ public class
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
+ "for action to be completed.", HttpStatus.S_409_CONFLICT);
return;
}
- this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), actionType);
+ this.dagActionStore.addFlowDagAction(flowGroup, flowName,
flowExecutionId.toString(), actionType);
} catch (IOException | SQLException e) {
log.warn(
String.format("Failed to add %s action for flow %s %s %s to dag
action store due to:", actionType, flowGroup,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 00fc8b566..68fded2e0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -180,6 +180,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
String flowGroup = value.getFlowGroup();
String flowName = value.getFlowName();
String flowExecutionId = value.getFlowExecutionId();
+ String jobName = value.getJobName();
produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
log.debug("Processing Dag Action message for flow group: {} name: {}
executionId: {} tid: {} operation: {} lag: {}",
@@ -200,8 +201,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
DagActionStore.DagActionType dagActionType =
DagActionStore.DagActionType.valueOf(value.getDagAction().toString());
// Used to easily log information to identify the dag action
- // TODO: add jobName to the dagAction change event
- DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, "",
+ DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
dagActionType);
// We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
@@ -210,12 +210,13 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
if (operation.equals("INSERT")) {
handleDagAction(dagAction, false);
} else if (operation.equals("UPDATE")) {
+ // TODO: change this warning message and process updates if for launch
or reevaluate type
log.warn("Received an UPDATE action to the DagActionStore when values
in this store are never supposed to be "
+ "updated. Flow group: {} name {} executionId {} were updated to
action {}", flowGroup, flowName,
flowExecutionId, dagActionType);
this.unexpectedErrors.mark();
} else if (operation.equals("DELETE")) {
- log.debug("Deleted flow group: {} name: {} executionId {} from
DagActionStore", flowGroup, flowName, flowExecutionId);
+ log.debug("Deleted dagAction from DagActionStore: {}", dagAction);
} else {
log.warn("Received unsupported change type of operation {}. Expected
values to be in [INSERT, UPDATE, DELETE]",
operation);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 9fee0d3f1..3dcc106f5 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -64,6 +64,7 @@ import
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.MysqlDagStateStore;
import
org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
import org.apache.gobblin.service.monitoring.GitConfigMonitor;
import org.apache.gobblin.testing.AssertWithBackoff;
@@ -190,6 +191,8 @@ public class GobblinServiceManagerTest {
serviceCoreProperties.put(AbstractUserQuotaManager.PER_USER_QUOTA,
"testUser:1");
transportClientProperties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT,
"10000");
+
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY,
true);
+
// Create a bare repository
RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new
File(GIT_REMOTE_REPO_DIR), FS.DETECTED);
fileKey.open(false).create(true);
@@ -266,6 +269,17 @@ public class GobblinServiceManagerTest {
mysql.stop();
}
+ /**
+ * Tests retrieving two classes initiated by GobblinServiceGuiceModule. One
is explicitly bound and another optionally
+ * through a configuration enabled in the setup method above.
+ */
+ @Test
+ public void testGetClass() {
+
Assert.assertTrue(this.gobblinServiceManager.getClass(FlowCompilationValidationHelper.class)
instanceof FlowCompilationValidationHelper);
+ // Optionally bound config
+ Assert.assertTrue(this.gobblinServiceManager.getClass(FlowCatalog.class)
instanceof FlowCatalog);
+ }
+
/**
* To test an existing flow in a spec store does not get deleted just
because it is not compilable during service restarts
*/
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
new file mode 100644
index 000000000..cb2ac540b
--- /dev/null
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderSchedulerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Date;
+import java.util.List;
+import java.util.function.Supplier;
+
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.Trigger;
+import org.quartz.TriggerUtils;
+import org.quartz.spi.OperableTrigger;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+
+
+public class DagActionReminderSchedulerTest {
+ String flowGroup = "fg";
+ String flowName = "fn";
+ String flowExecutionId = "123";
+ String jobName = "jn";
+ String expectedKey = String.join(".", flowGroup, flowName, flowExecutionId,
jobName,
+ String.valueOf(DagActionStore.DagActionType.LAUNCH));
+ DagActionStore.DagAction launchDagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, jobName,
+ DagActionStore.DagActionType.LAUNCH);
+
+ @Test
+ public void testCreateDagActionReminderKey() {
+ Assert.assertEquals(expectedKey,
DagActionReminderScheduler.createDagActionReminderKey(launchDagAction));
+ }
+
+ @Test
+ public void testCreateReminderJobTrigger() {
+ long reminderDuration = 666L;
+ Supplier<Long> getCurrentTimeMillis = () -> 12345600000L;
+ Trigger reminderTrigger = DagActionReminderScheduler
+ .createReminderJobTrigger(launchDagAction, reminderDuration,
getCurrentTimeMillis);
+ Assert.assertEquals(reminderTrigger.getKey().toString(), flowGroup + "." +
expectedKey);
+ List<Date> fireTimes = TriggerUtils.computeFireTimes((OperableTrigger)
reminderTrigger, null, 1);
+ Assert.assertEquals(fireTimes.get(0), new Date(reminderDuration +
getCurrentTimeMillis.get()));
+ }
+
+ @Test
+ public void testCreateReminderJobDetail() {
+ JobDetail jobDetail =
DagActionReminderScheduler.createReminderJobDetail(launchDagAction);
+ Assert.assertEquals(jobDetail.getKey().toString(), flowGroup + "." +
expectedKey);
+ JobDataMap dataMap = jobDetail.getJobDataMap();
+ Assert.assertEquals(dataMap.get(ConfigurationKeys.FLOW_GROUP_KEY),
flowGroup);
+ Assert.assertEquals(dataMap.get(ConfigurationKeys.FLOW_NAME_KEY),
flowName);
+ Assert.assertEquals(dataMap.get(ConfigurationKeys.FLOW_EXECUTION_ID_KEY),
flowExecutionId);
+ Assert.assertEquals(dataMap.get(ConfigurationKeys.JOB_NAME_KEY), jobName);
+
Assert.assertEquals(dataMap.get(DagActionReminderScheduler.ReminderJob.FLOW_ACTION_TYPE_KEY),
+ DagActionStore.DagActionType.LAUNCH);
+ }
+}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 3ca35f4a3..2e677f59d 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.orchestration;
+import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -75,7 +76,9 @@ public class DagManagementTaskStreamImplTest {
MostlyMySqlDagManagementStateStore dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null, null);
dagManagementStateStore.setTopologySpecMap(topologySpecMap);
this.dagManagementTaskStream =
- new DagManagementTaskStreamImpl(config, Optional.empty(),
Optional.of(mock(ReminderSettingDagProcLeaseArbiter.class)));
+ new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
+ mock(MultiActiveLeaseArbiter.class),
Optional.of(mock(DagActionReminderScheduler.class)),
+ false);
this.dagProcFactory = new DagProcFactory(null);
this.dagProcEngineThread = new DagProcessingEngine.DagProcEngineThread(
this.dagManagementTaskStream, this.dagProcFactory,
dagManagementStateStore);
@@ -84,10 +87,11 @@ public class DagManagementTaskStreamImplTest {
/* This tests adding and removal of dag actions from dag task stream with a
launch task. It verifies that the
{@link DagManagementTaskStreamImpl#next()} call blocks until a {@link
LeaseAttemptStatus.LeaseObtainedStatus} is
returned for a particular action.
- TODO: when we have different dag procs in future, update this test to add
other types of actions (and tasks)
+ TODO: when we have different dag procs in the future, update this test to
add other types of actions (and tasks)
*/
@Test
- public void addRemoveDagActions() {
+ public void addRemoveDagActions()
+ throws IOException {
/* Three duplicate actions are added to the task stream, since the first
two calls to lease arbitration will return
statuses that should cause the next() method to continue polling for tasks
before finally providing the
LeaseObtainedStatus to the taskStream to break its loop and return a
newly created dagTask
@@ -96,7 +100,7 @@ public class DagManagementTaskStreamImplTest {
dagManagementTaskStream.addDagAction(launchAction);
dagManagementTaskStream.addDagAction(launchAction);
dagManagementTaskStream.addDagAction(launchAction);
- when(dagManagementTaskStream.getReminderSettingDagProcLeaseArbiter().get()
+ when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter()
.tryAcquireLease(any(DagActionStore.DagAction.class), anyLong(),
anyBoolean(), anyBoolean()))
.thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),
new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 15),
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index bf84f890c..0afdbe9e8 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -58,6 +58,10 @@ import org.apache.gobblin.util.ConfigUtils;
import static org.mockito.Mockito.*;
+/**
+ * Tests the state updates (including updating in-memory state and
MysqlDagActionStore) after performing add or cancel
+ * operations by calling addDag, stopDag, kill, and resume. It also tests
flows with and without sla configs.
+ */
public class DagManagerFlowTest {
MockedDagManager dagManager;
int dagNumThreads;
@@ -85,8 +89,8 @@ public class DagManagerFlowTest {
.build();
dagActionStore = new MysqlDagActionStore(config);
- dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionType.KILL);
- dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2,
DagActionStore.DagActionType.RESUME);
+ dagActionStore.addFlowDagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionType.KILL);
+ dagActionStore.addFlowDagAction(flowGroup, flowName, flowExecutionId_2,
DagActionStore.DagActionType.RESUME);
dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props));
dagManager.dagActionStore = Optional.of(dagActionStore);
dagManager.setActive(true);
@@ -127,7 +131,7 @@ public class DagManagerFlowTest {
// mock add spec
// for very first dag to be added, add dag action to store and check its
deleted by the addDag call
- dagManager.getDagActionStore().get().addDagAction("group0", "flow0",
Long.toString(flowExecutionId1), DagActionStore.DagActionType.LAUNCH);
+ dagManager.getDagActionStore().get().addFlowDagAction("group0", "flow0",
Long.toString(flowExecutionId1), DagActionStore.DagActionType.LAUNCH);
dagManager.addDag(dag1, true, true);
Assert.assertFalse(dagManager.getDagActionStore().get().exists("group0",
"flow0", Long.toString(flowExecutionId1), DagActionStore.DagActionType.LAUNCH));
dagManager.addDag(dag2, true, true);
@@ -336,7 +340,7 @@ public class DagManagerFlowTest {
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
// Add kill action to action store and call kill
- dagActionStore.addDagAction(flowGroup, flowName,
String.valueOf(flowExecutionId), DagActionStore.DagActionType.KILL);
+ dagActionStore.addFlowDagAction(flowGroup, flowName,
String.valueOf(flowExecutionId), DagActionStore.DagActionType.KILL);
dagManager.handleKillFlowRequest(flowGroup, flowName, flowExecutionId);
// Check that the kill dag action is removed
@@ -353,7 +357,7 @@ public class DagManagerFlowTest {
// Add resume action to action store and call resume
- dagActionStore.addDagAction(flowGroup, flowName,
String.valueOf(flowExecutionId), DagActionStore.DagActionType.RESUME);
+ dagActionStore.addFlowDagAction(flowGroup, flowName,
String.valueOf(flowExecutionId), DagActionStore.DagActionType.RESUME);
dagManager.handleResumeFlowRequest(flowGroup, flowName, flowExecutionId);
// Check that the resume dag action is removed
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 99f5bd577..138ddb3a2 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.orchestration;
+import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -44,6 +45,7 @@ import
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.testing.AssertWithBackoff;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@Slf4j
@@ -79,7 +81,8 @@ public class DagProcessingEngineTest {
this.dagManagementStateStore = new
MostlyMySqlDagManagementStateStore(config, null, null, null);
this.dagManagementStateStore.setTopologySpecMap(topologySpecMap);
this.dagManagementTaskStream =
- new DagManagementTaskStreamImpl(config, Optional.empty(), null);
+ new DagManagementTaskStreamImpl(config,
Optional.of(mock(DagActionStore.class)),
+ mock(MultiActiveLeaseArbiter.class), Optional.empty(), false);
this.dagProcFactory = new DagProcFactory(null);
DagProcessingEngine.DagProcEngineThread dagProcEngineThread =
new
DagProcessingEngine.DagProcEngineThread(this.dagManagementTaskStream,
this.dagProcFactory,
@@ -118,7 +121,7 @@ public class DagProcessingEngineTest {
private final boolean isBad;
public MockedDagTask(DagActionStore.DagAction dagAction, boolean isBad) {
- super(dagAction, null);
+ super(dagAction, null, null);
this.isBad = isBad;
}
@@ -136,6 +139,7 @@ public class DagProcessingEngineTest {
static class MockedDagProc extends DagProc<Void, Void> {
private final boolean isBad;
public MockedDagProc(boolean isBad) {
+ super(null);
this.isBad = isBad;
}
@@ -158,11 +162,7 @@ public class DagProcessingEngineTest {
}
@Override
- protected void sendNotification(Void result, EventSubmitter
eventSubmitter) {
- }
-
- @Override
- protected void commit(DagManagementStateStore dagManagementStateStore,
Void result) {
+ protected void sendNotification(Void result, EventSubmitter
eventSubmitter) throws IOException {
}
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index 2b0edfdec..da0bb737d 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -62,13 +62,13 @@ public class MysqlDagActionStoreTest {
@Test
public void testAddAction() throws Exception {
- this.mysqlDagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.KILL);
+ this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.KILL);
//Should not be able to add KILL again when previous one exist
Assert.expectThrows(IOException.class,
- () -> this.mysqlDagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.KILL));
+ () -> this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.KILL));
//Should be able to add a RESUME action for same execution as well as KILL
for another execution of the flow
- this.mysqlDagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.RESUME);
- this.mysqlDagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId_2, jobName, DagActionStore.DagActionType.KILL);
+ this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.RESUME);
+ this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId_2, jobName, DagActionStore.DagActionType.KILL);
}
@Test(dependsOnMethods = "testAddAction")
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 5b0e4c424..93003a99a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -38,13 +38,14 @@ import
org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
@Slf4j
public class MysqlMultiActiveLeaseArbiterTest {
- private static final long EPSILON = 10000;
+ private static final long EPSILON = 10000L;
private static final long MORE_THAN_EPSILON = (long) (EPSILON * 1.1);
- private static final long LINGER = 50000;
+ private static final long LINGER = 50000L;
private static final long MORE_THAN_LINGER = (long) (LINGER * 1.1);
private static final String USER = "testUser";
private static final String PASSWORD = "testPassword";
private static final String TABLE = "mysql_multi_active_lease_arbiter_store";
+ private static final String CONSTANTS_TABLE = "constants_store";
private static final String flowGroup = "testFlowGroup";
private static final String flowGroup2 = "testFlowGroup2";
private static final String flowName = "testFlowName";
@@ -77,7 +78,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
.addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
.addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
.addPrimitive(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX + "." +
ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
-
.addPrimitive(ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
TABLE)
+
.addPrimitive(ConfigurationKeys.LEASE_DETERMINATION_STORE_DB_TABLE_KEY, TABLE)
+ .addPrimitive(ConfigurationKeys.MULTI_ACTIVE_CONSTANTS_DB_TABLE_KEY,
CONSTANTS_TABLE)
.build();
this.mysqlMultiActiveLeaseArbiter = new
MysqlMultiActiveLeaseArbiter(config);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index e75b2952e..6b71977a1 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -77,8 +77,10 @@ public class LaunchDagProcTest {
5, "user5",
ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_GROUP_KEY,
ConfigValueFactory.fromAnyRef("fg")));
FlowCompilationValidationHelper flowCompilationValidationHelper =
mock(FlowCompilationValidationHelper.class);
doReturn(com.google.common.base.Optional.of(dag)).when(flowCompilationValidationHelper).createExecutionPlanIfValid(any());
- LaunchDagProc launchDagProc = new LaunchDagProc(new LaunchDagTask(new
DagActionStore.DagAction("fg", "fn",
- "12345", "jn", DagActionStore.DagActionType.LAUNCH), null),
flowCompilationValidationHelper);
+ LaunchDagProc launchDagProc = new LaunchDagProc(
+ new LaunchDagTask(new DagActionStore.DagAction("fg", "fn", "12345",
+ "jn", DagActionStore.DagActionType.LAUNCH), null,
mock(DagActionStore.class)),
+ flowCompilationValidationHelper);
launchDagProc.process(this.dagManagementStateStore);
int expectedNumOfSavingDagNodeStates = 1; // = number of start nodes