This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2bc6f8773 [GOBBLIN-2027] move DagActionStore, MultiActiveLeaseArbiter
and their implementation/test classes from gobblin-runtime to gobblin-service
(#3904)
2bc6f8773 is described below
commit 2bc6f8773f09ff7eb31560b5569021bd0fd044c3
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Wed Mar 27 19:40:24 2024 -0700
[GOBBLIN-2027] move DagActionStore, MultiActiveLeaseArbiter and their
implementation/test classes from gobblin-runtime to gobblin-service (#3904)
* move DagActionStore from gobblin-runtime to gobblin-service
* move LeaseAttemptStatus out of MultiActiveLeaseArbiter interface
* add doc
* address review comment
---
.../runtime/DagActionStoreChangeMonitorTest.java | 2 +-
.../modules/core/GobblinServiceGuiceModule.java | 8 +-
.../orchestration/DagActionReminderScheduler.java | 2 -
.../modules/orchestration}/DagActionStore.java | 12 ++-
.../modules/orchestration/DagManagement.java | 4 +-
.../orchestration/DagManagementTaskStreamImpl.java | 12 +--
.../service/modules/orchestration/DagManager.java | 3 +-
.../modules/orchestration/DagManagerUtils.java | 1 -
.../modules/orchestration/FlowLaunchHandler.java | 52 ++++++-----
.../orchestration/InstrumentedLeaseArbiter.java | 14 ++-
.../modules/orchestration/LeaseAttemptStatus.java | 99 ++++++++++++++++++++
.../orchestration}/MultiActiveLeaseArbiter.java | 77 +--------------
.../orchestration}/MysqlDagActionStore.java | 9 +-
.../MysqlMultiActiveLeaseArbiter.java | 25 ++---
.../modules/orchestration/Orchestrator.java | 1 -
.../ReminderSettingDagProcLeaseArbiter.java | 14 ++-
.../modules/orchestration/proc/LaunchDagProc.java | 2 +-
.../modules/orchestration/task/DagTask.java | 8 +-
.../modules/orchestration/task/LaunchDagTask.java | 6 +-
...lowExecutionResourceHandlerWithWarmStandby.java | 2 +-
.../monitoring/DagActionStoreChangeMonitor.java | 2 +-
.../DagActionStoreChangeMonitorFactory.java | 2 +-
.../DagManagementDagActionStoreChangeMonitor.java | 6 +-
...nagementDagActionStoreChangeMonitorFactory.java | 2 +-
.../DagManagementTaskStreamImplTest.java | 8 +-
.../modules/orchestration/DagManagerFlowTest.java | 2 -
.../orchestration/DagProcessingEngineTest.java | 1 -
.../orchestration/FlowLaunchHandlerTest.java | 6 +-
.../orchestration}/MysqlDagActionStoreTest.java | 5 +-
.../MysqlMultiActiveLeaseArbiterTest.java | 103 +++++++++++----------
.../orchestration/proc/LaunchDagProcTest.java | 2 +-
31 files changed, 257 insertions(+), 235 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
index 706f7a8c8..71c97f0e5 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/DagActionStoreChangeMonitorTest.java
@@ -25,7 +25,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.DagManager;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 414055faf..c01ff3e62 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -38,11 +38,11 @@ import com.typesafe.config.Config;
import javax.inject.Singleton;
import org.apache.gobblin.restli.EmbeddedRestliServer;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
-import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
-import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
+import
org.apache.gobblin.service.modules.orchestration.MultiActiveLeaseArbiter;
+import
org.apache.gobblin.service.modules.orchestration.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.MysqlDagActionStore;
import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
index f1a3109ee..ddcd1fa79 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java
@@ -27,8 +27,6 @@ import org.quartz.impl.StdSchedulerFactory;
import javax.inject.Inject;
-import org.apache.gobblin.runtime.api.DagActionStore;
-
/**
* This class is used to keep track of reminders of pending flow action events
to execute. A host calls the
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
similarity index 92%
rename from
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
rename to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index 562b3cb9e..3076486f7 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -15,14 +15,16 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.api;
+package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collection;
import lombok.Data;
+
import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
public interface DagActionStore {
@@ -54,6 +56,14 @@ public interface DagActionStore {
return new DagAction(this.getFlowGroup(), this.getFlowName(),
String.valueOf(eventTimeMillis), this.getJobName(),
this.getDagActionType());
}
+
+ /**
+ * Creates and returns a {@link DagNodeId} for this DagAction.
+ */
+ public DagNodeId getDagNodeId() {
+ return new DagNodeId(this.flowGroup, this.flowName,
+ Long.parseLong(this.flowExecutionId), this.flowGroup, this.jobName);
+ }
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
index 1e4b17c90..75ee9b60e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java
@@ -19,12 +19,10 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
-import org.apache.gobblin.runtime.api.DagActionStore;
-
/**
* An interface to provide abstractions for managing operations on Dag.
- * It accepts a {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction} which can be processed
later.
+ * It accepts a {@link DagActionStore.DagAction} which can be processed later.
* Consumption of the Dags happen through {@link DagTaskStream}.
*/
public interface DagManagement {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 192243075..a6778c82f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -33,8 +33,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.util.ConfigUtils;
@@ -42,7 +40,7 @@ import org.apache.gobblin.util.ConfigUtils;
/**
* DagManagementTaskStreamImpl implements {@link DagManagement} and {@link
DagTaskStream}. It accepts
- * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction}s and
iteratively provides {@link DagTask}.
+ * {@link DagActionStore.DagAction}s and iteratively provides {@link DagTask}.
*/
@Slf4j
@Singleton
@@ -93,14 +91,14 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
throw new RuntimeException("DagManagement not initialized in
multi-active execution mode when required.");
}
try {
- MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = null;
+ LeaseAttemptStatus leaseAttemptStatus = null;
DagActionStore.DagAction dagAction = null;
- while (!(leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus)) {
+ while (!(leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus)) {
dagAction = this.dagActionQueue.take(); //`take` blocks till element
is not available
// TODO: need to handle reminder events and flag them
leaseAttemptStatus =
this.reminderSettingDagProcLeaseArbiter.get().tryAcquireLease(dagAction,
System.currentTimeMillis(), false, false);
}
- return createDagTask(dagAction,
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus);
+ return createDagTask(dagAction, (LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttemptStatus);
} catch (Throwable t) {
//TODO: need to handle exceptions gracefully
log.error("Error getting DagAction from the queue / creating DagTask",
t);
@@ -108,7 +106,7 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
return null;
}
- private DagTask createDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ private DagTask createDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
DagActionStore.DagActionType dagActionType = dagAction.getDagActionType();
switch (dagActionType) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 71b8a318a..6ce85cfd3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -65,7 +65,6 @@ import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -184,7 +183,7 @@ public class DagManager extends AbstractIdleService {
}
DagActionStore.DagAction toDagAction(DagActionStore.DagActionType
actionType) {
- // defaults to empty jobName
+ // defaults to empty jobName
return new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, "", actionType);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index decf579aa..c4b62b5ec 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -40,7 +40,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecProducer;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index 1cd5af044..b407c445c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -17,9 +17,6 @@
package org.apache.gobblin.service.modules.orchestration;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.typesafe.config.Config;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -29,28 +26,33 @@ import java.util.Date;
import java.util.Locale;
import java.util.Properties;
import java.util.Random;
+
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.impl.JobDetailImpl;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
import javax.inject.Inject;
import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.modules.core.GobblinServiceGuiceModule;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.util.ConfigUtils;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobKey;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.impl.JobDetailImpl;
/**
@@ -59,7 +61,7 @@ import org.quartz.impl.JobDetailImpl;
* lease owner at a given time for the launch dag action event. After
acquiring the lease, it persists the dag action
* event to the {@link DagActionStore} to be eventually acted upon by
execution module of host(s) to execute the launch.
* Once it has completed persisting the action to the store, it will mark the
lease as completed by calling the
- * {@link
MultiActiveLeaseArbiter#recordLeaseSuccess(MultiActiveLeaseArbiter.LeaseObtainedStatus)}
method. Hosts
+ * {@link
MultiActiveLeaseArbiter#recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus)}
method. Hosts
* that do not gain the lease for the event, instead schedule a reminder using
the {@link SchedulerService} to check
* back in on the previous lease owner's completion status after the lease
should expire to ensure the event is handled
* in failure cases.
@@ -107,12 +109,12 @@ public class FlowLaunchHandler {
public void handleFlowLaunchTriggerEvent(Properties jobProps,
DagActionStore.DagAction dagAction,
long eventTimeMillis, boolean isReminderEvent, boolean
skipFlowExecutionIdReplacement) throws IOException {
if (this.multiActiveLeaseArbiter.isPresent()) {
- MultiActiveLeaseArbiter.LeaseAttemptStatus
+ LeaseAttemptStatus
leaseAttemptStatus = this.multiActiveLeaseArbiter.get()
.tryAcquireLease(dagAction, eventTimeMillis, isReminderEvent,
skipFlowExecutionIdReplacement);
- if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
- MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus =
- (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+ if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus) {
+ LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus =
+ (LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus;
if (persistDagAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}]
", leaseObtainedStatus.getDagAction(),
leaseObtainedStatus.getEventTimeMillis());
@@ -121,9 +123,9 @@ public class FlowLaunchHandler {
// If persisting the dag action failed, then we set another trigger
for this event to occur immediately to
// re-attempt handling the event
scheduleReminderForEvent(jobProps,
- new
MultiActiveLeaseArbiter.LeasedToAnotherStatus(leaseObtainedStatus.getDagAction(),
0L), eventTimeMillis);
- } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
- scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
+ new
LeaseAttemptStatus.LeasedToAnotherStatus(leaseObtainedStatus.getDagAction(),
0L), eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(LeaseAttemptStatus.LeasedToAnotherStatus) leaseAttemptStatus,
eventTimeMillis);
}
// Otherwise leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus & no need to do anything
@@ -134,7 +136,7 @@ public class FlowLaunchHandler {
}
// Called after obtaining a lease to persist the dag action to {@link
DagActionStore} and mark the lease as done
- private boolean persistDagAction(MultiActiveLeaseArbiter.LeaseObtainedStatus
leaseStatus) {
+ private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus
leaseStatus) {
if (this.dagActionStore.isPresent() &&
this.multiActiveLeaseArbiter.isPresent()) {
try {
DagActionStore.DagAction dagAction = leaseStatus.getDagAction();
@@ -159,7 +161,7 @@ public class FlowLaunchHandler {
* @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
* @param triggerEventTimeMillis the event timestamp we were originally
handling
*/
- private void scheduleReminderForEvent(Properties jobProps,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
+ private void scheduleReminderForEvent(Properties jobProps,
LeaseAttemptStatus.LeasedToAnotherStatus status,
long triggerEventTimeMillis) {
DagActionStore.DagAction dagAction = status.getDagAction();
JobKey origJobKey = new
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
@@ -190,7 +192,7 @@ public class FlowLaunchHandler {
* @return Trigger for reminder
* @throws SchedulerException
*/
- protected Trigger createAndScheduleReminder(JobKey origJobKey,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
+ protected Trigger createAndScheduleReminder(JobKey origJobKey,
LeaseAttemptStatus.LeasedToAnotherStatus status,
long triggerEventTimeMillis) throws SchedulerException {
// Generate a suffix to differentiate the reminder Job and Trigger from
the original JobKey and Trigger, so we can
// allow us to keep track of additional properties needed for reminder
events (all triggers associated with one job
@@ -214,7 +216,7 @@ public class FlowLaunchHandler {
* @return
*/
@VisibleForTesting
- public static String
createSuffixForJobTrigger(MultiActiveLeaseArbiter.LeasedToAnotherStatus
leasedToAnotherStatus) {
+ public static String
createSuffixForJobTrigger(LeaseAttemptStatus.LeasedToAnotherStatus
leasedToAnotherStatus) {
return "reminder_for_" + leasedToAnotherStatus.getEventTimeMillis();
}
@@ -229,7 +231,7 @@ public class FlowLaunchHandler {
* @throws SchedulerException
*/
protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
JobKey reminderKey,
- MultiActiveLeaseArbiter.LeasedToAnotherStatus status)
+ LeaseAttemptStatus.LeasedToAnotherStatus status)
throws SchedulerException {
JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey);
jobDetail.setKey(reminderKey);
@@ -253,7 +255,7 @@ public class FlowLaunchHandler {
*/
@VisibleForTesting
public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
- MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus, int
schedulerMaxBackoffMillis) {
+ LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus, int
schedulerMaxBackoffMillis) {
Properties prevJobProps = (Properties)
jobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
// Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
long delayPeriodMillis =
leasedToAnotherStatus.getMinimumLingerDurationMillis()
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
index f06def5b1..51f40c04a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InstrumentedLeaseArbiter.java
@@ -29,8 +29,6 @@ import org.apache.gobblin.metrics.ContextAwareCounter;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.util.ConfigUtils;
@@ -74,24 +72,24 @@ public class InstrumentedLeaseArbiter implements
MultiActiveLeaseArbiter {
}
@Override
- public MultiActiveLeaseArbiter.LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction dagAction, long eventTimeMillis,
+ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
dagAction, long eventTimeMillis,
boolean isReminderEvent, boolean skipFlowExecutionIdReplacement) throws
IOException {
- MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
+ LeaseAttemptStatus leaseAttemptStatus =
decoratedMultiActiveLeaseArbiter.tryAcquireLease(dagAction,
eventTimeMillis, isReminderEvent,
skipFlowExecutionIdReplacement);
log.info("Multi-active scheduler lease attempt for dagAction: {} received
type of leaseAttemptStatus: [{}, "
+ "eventTimestamp: {}] ", dagAction,
leaseAttemptStatus.getClass().getName(), eventTimeMillis);
- if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ if (leaseAttemptStatus instanceof LeaseAttemptStatus.LeaseObtainedStatus) {
if (isReminderEvent) {
this.leasesObtainedDueToReminderCount.mark();
}
this.leaseObtainedCount.inc();
return leaseAttemptStatus;
- } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ } else if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus) {
this.leasedToAnotherStatusCount.inc();
return leaseAttemptStatus;
- } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ } else if (leaseAttemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus) {
this.noLongerLeasingStatusCount.inc();
return leaseAttemptStatus;
}
@@ -100,7 +98,7 @@ public class InstrumentedLeaseArbiter implements
MultiActiveLeaseArbiter {
}
@Override
- public boolean recordLeaseSuccess(LeaseObtainedStatus status)
+ public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus
status)
throws IOException {
if (this.decoratedMultiActiveLeaseArbiter.recordLeaseSuccess(status)) {
this.recordedLeaseSuccessCount.mark();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
new file mode 100644
index 000000000..65526cb21
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import lombok.AccessLevel;
+import lombok.Data;
+import lombok.Getter;
+
+
+/**
+ Class used to encapsulate status of lease acquisition attempts made by {@link
MultiActiveLeaseArbiter} and contains
+ information specific to the status that results. The {@link
LeaseAttemptStatus#getDagAction} and
+ {@link LeaseAttemptStatus#getMinimumLingerDurationMillis} are meant to be
+ overridden and used by relevant derived classes.
+ */
+public abstract class LeaseAttemptStatus {
+ public DagActionStore.DagAction getDagAction() {
+ return null;
+ }
+
+ public long getMinimumLingerDurationMillis() {
+ return 0;
+ }
+
+ /*
+ * This LeaseAttemptStatus tells the caller that work for which lease was
requested is completed and thus lease is no
+ * longer required. There is also no need to set reminder events.
+ */
+ public static class NoLongerLeasingStatus extends LeaseAttemptStatus {}
+
+ /*
+ The participant calling this method acquired the lease for the event in
question. `Dag action`'s flow execution id
+ is the timestamp associated with the lease and the time the caller obtained
the lease is stored within the
+ `leaseAcquisitionTimestamp` field. The `multiActiveLeaseArbiter` reference
is used to recordLeaseSuccess for the
+ current LeaseObtainedStatus via the completeLease method from a caller
without access to the {@link MultiActiveLeaseArbiter}.
+ */
+ @Data
+ public static class LeaseObtainedStatus extends LeaseAttemptStatus {
+ private final DagActionStore.DagAction dagAction;
+ private final long leaseAcquisitionTimestamp;
+ private final long minimumLingerDurationMillis;
+ @Getter(AccessLevel.NONE)
+ private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+
+ /**
+ * @return event time in millis since epoch for the event of this lease
acquisition
+ */
+ public long getEventTimeMillis() {
+ return Long.parseLong(dagAction.getFlowExecutionId());
+ }
+
+ /**
+ * Completes the lease referenced by this status object if it has not
expired.
+ * @return true if able to complete lease, false otherwise.
+ * @throws IOException
+ */
+ public boolean completeLease() throws IOException {
+ return multiActiveLeaseArbiter.recordLeaseSuccess(this);
+ }
+ }
+
+ /*
+ This dag action event already has a valid lease owned by another participant.
+ `Dag action`'s flow execution id is the timestamp the lease is associated
with, however the dag action event it
+ corresponds to may be a different and distinct occurrence of the same event.
+ `minimumLingerDurationMillis` is the minimum amount of time to wait before
this participant should return to check if
+ the lease has completed or expired
+ */
+ @Data
+ public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
+ private final DagActionStore.DagAction dagAction;
+ private final long minimumLingerDurationMillis;
+
+ /**
+ * Returns event time in millis since epoch for the event whose lease was
obtained by another participant.
+ * @return
+ */
+ public long getEventTimeMillis() {
+ return Long.parseLong(dagAction.getFlowExecutionId());
+ }
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
similarity index 61%
rename from
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
rename to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
index 89df850be..3fb5acb95 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
@@ -15,14 +15,10 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.api;
+package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
-import lombok.Data;
-import lombok.Getter;
-import lombok.AccessLevel;
-
/**
* This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
@@ -76,74 +72,5 @@ public interface MultiActiveLeaseArbiter {
* false if failed to update the lease properly, the caller should
continue seeking to acquire the lease as
* if any actions it did successfully accomplish, do not count
*/
- boolean recordLeaseSuccess(LeaseObtainedStatus status) throws IOException;
-
- /*
- Class used to encapsulate status of lease acquisition attempt and
derivations should contain information specific to
- the status that results. The #getDagAction and
#getMinimumLingerDurationMillis are meant to be overriden and used by
- relevant derived classes.
- */
- abstract class LeaseAttemptStatus {
- public DagActionStore.DagAction getDagAction() {
- return null;
- }
-
- public long getMinimumLingerDurationMillis() {
- return 0;
- }
- }
-
- class NoLongerLeasingStatus extends LeaseAttemptStatus {}
-
- /*
- The participant calling this method acquired the lease for the event in
question. `Dag action`'s flow execution id
- is the timestamp associated with the lease and the time the caller obtained
the lease is stored within the
- `leaseAcquisitionTimestamp` field. The `multiActiveLeaseArbiter` reference
is used to recordLeaseSuccess for the
- current LeaseObtainedStatus via the completeLease method from a caller
without access to the {@link MultiActiveLeaseArbiter}.
- */
- @Data
- class LeaseObtainedStatus extends LeaseAttemptStatus {
- private final DagActionStore.DagAction dagAction;
- private final long leaseAcquisitionTimestamp;
- private final long minimumLingerDurationMillis;
- @Getter(AccessLevel.NONE)
- private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
-
- /**
- * @return event time in millis since epoch for the event of this lease
acquisition
- */
- public long getEventTimeMillis() {
- return Long.parseLong(dagAction.getFlowExecutionId());
- }
-
- /**
- * Completes the lease referenced by this status object if it has not
expired.
- * @return true if able to complete lease, false otherwise.
- * @throws IOException
- */
- public boolean completeLease() throws IOException {
- return multiActiveLeaseArbiter.recordLeaseSuccess(this);
- }
- }
-
- /*
- This dag action event already has a valid lease owned by another participant.
- `Dag action`'s flow execution id is the timestamp the lease is associated
with, however the dag action event it
- corresponds to may be a different and distinct occurrence of the same event.
- `minimumLingerDurationMillis` is the minimum amount of time to wait before
this participant should return to check if
- the lease has completed or expired
- */
- @Data
- class LeasedToAnotherStatus extends LeaseAttemptStatus {
- private final DagActionStore.DagAction dagAction;
- private final long minimumLingerDurationMillis;
-
- /**
- * Returns event time in millis since epoch for the event whose lease was
obtained by another participant.
- * @return
- */
- public long getEventTimeMillis() {
- return Long.parseLong(dagAction.getFlowExecutionId());
- }
-}
+ boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus status)
throws IOException;
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
similarity index 99%
rename from
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
rename to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index d02f5333c..1481f097e 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.dag_action_store;
+package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
import java.sql.Connection;
@@ -24,22 +24,21 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
import com.google.inject.Inject;
import com.typesafe.config.Config;
-import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
-
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
-import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExponentialBackoff;
import org.apache.gobblin.util.DBStatementExecutor;
+import org.apache.gobblin.util.ExponentialBackoff;
@Slf4j
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
similarity index 97%
rename from
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
rename to
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 88915b368..03142ebbc 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -15,10 +15,8 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.api;
+package org.apache.gobblin.service.modules.orchestration;
-import com.google.inject.Inject;
-import com.typesafe.config.Config;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -30,9 +28,14 @@ import java.util.Calendar;
import java.util.Optional;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
import javax.sql.DataSource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
@@ -271,7 +274,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
dbEventTimeMillis: {} - A new event trigger "
+ "is being worked on, so this older reminder will be
dropped.", dagAction,
isReminderEvent ? "reminder" : "original", eventTimeMillis,
dbEventTimestamp);
- return new NoLongerLeasingStatus();
+ return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
if (eventTimeMillis > dbEventTimestamp.getTime()) {
// TODO: emit metric here to capture this unexpected behavior
@@ -303,7 +306,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
updatedDagAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
- return new LeasedToAnotherStatus(updatedDagAction,
+ return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
}
DagActionStore.DagAction updatedDagAction =
@@ -311,7 +314,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
3: Distinct event, lease is valid",
updatedDagAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
- return new LeasedToAnotherStatus(updatedDagAction,
+ return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
} // Lease is invalid
else if (leaseValidityStatus == 2) {
@@ -332,7 +335,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
if (isWithinEpsilon) {
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 5: Same event, no longer leasing event"
+ " in db", dagAction, isReminderEvent ? "reminder" :
"original", dbCurrentTimestamp.getTime());
- return new NoLongerLeasingStatus();
+ return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
6: Distinct event, no longer leasing "
+ "event in db", dagAction, isReminderEvent ? "reminder" :
"original", dbCurrentTimestamp.getTime());
@@ -499,7 +502,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
SelectInfoResult selectInfoResult = getRowInfo(dagAction);
// Another participant won the lease in between
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
- return new NoLongerLeasingStatus();
+ return new LeaseAttemptStatus.NoLongerLeasingStatus();
}
DagActionStore.DagAction updatedDagAction =
adoptConsensusFlowExecutionId ?
dagAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis) : dagAction;
@@ -510,12 +513,12 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
if (numRowsUpdated == 1) {
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}]
successfully!", updatedDagAction,
isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis);
- return new LeaseObtainedStatus(updatedDagAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(),
minimumLingerDurationMillis, this);
+ return new LeaseAttemptStatus.LeaseObtainedStatus(updatedDagAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(),
minimumLingerDurationMillis, this);
}
log.info("Another participant acquired lease in between for [{}, is: {},
eventTimestamp: {}] - num rows updated: {}",
updatedDagAction, isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
- return new LeasedToAnotherStatus(updatedDagAction,
minimumLingerDurationMillis);
+ return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
minimumLingerDurationMillis);
}
/**
@@ -579,7 +582,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
}
@Override
- public boolean recordLeaseSuccess(LeaseObtainedStatus status)
+ public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus
status)
throws IOException {
DagActionStore.DagAction dagAction = status.getDagAction();
return
dbStatementExecutor.withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT,
leaseArbiterTableName),
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 14805ee0d..f4977caa2 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -53,7 +53,6 @@ import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
index bc24cd0e7..91e7f20b0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/ReminderSettingDagProcLeaseArbiter.java
@@ -37,8 +37,6 @@ import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.service.modules.core.GobblinServiceGuiceModule;
@@ -47,7 +45,7 @@ import
org.apache.gobblin.service.modules.core.GobblinServiceGuiceModule;
* added capabilities to properly handle the result of attempted ownership
over these flow action events. It uses the
* {@link MultiActiveLeaseArbiter} to determine a single lease owner at a
given event time for a flow action event.
* If the status of the lease ownership attempt is anything other than an
indication the lease has been completed
- * ({@link
org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter.NoLongerLeasingStatus})
then the
+ * ({@link LeaseAttemptStatus.NoLongerLeasingStatus}) then the
* {@link MultiActiveLeaseArbiter#tryAcquireLease} method will set a reminder
for the flow action using
* {@link DagActionReminderScheduler} to reattempt the lease after the current
lease holder's grant would have expired.
*/
@@ -72,17 +70,17 @@ public class ReminderSettingDagProcLeaseArbiter implements
MultiActiveLeaseArbit
* Attempts a lease for a particular job event and sets a reminder to
revisit if the lease has not been completed.
*/
@Override
- public MultiActiveLeaseArbiter.LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction dagAction, long eventTimeMillis,
+ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
dagAction, long eventTimeMillis,
boolean isReminderEvent, boolean skipFlowExecutionIdReplacement) {
if (this.decoratedLeaseArbiter.isPresent()) {
try {
- MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
+ LeaseAttemptStatus leaseAttemptStatus =
this.decoratedLeaseArbiter.get().tryAcquireLease(dagAction,
eventTimeMillis, isReminderEvent,
skipFlowExecutionIdReplacement);
/* Schedule a reminder for the event unless the lease has been completed
to safeguard against the case where even
we, when we might become the lease owner still fail to complete
processing
*/
- if (!(leaseAttemptStatus instanceof NoLongerLeasingStatus)) {
+ if (!(leaseAttemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus)) {
scheduleReminderForEvent(leaseAttemptStatus);
}
return leaseAttemptStatus;
@@ -95,7 +93,7 @@ public class ReminderSettingDagProcLeaseArbiter implements
MultiActiveLeaseArbit
}
@Override
- public boolean recordLeaseSuccess(LeaseObtainedStatus status)
+ public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus
status)
throws IOException {
if (!this.decoratedLeaseArbiter.isPresent()) {
throw new RuntimeException(MISSING_OPTIONAL_ERROR_MESSAGE);
@@ -103,7 +101,7 @@ public class ReminderSettingDagProcLeaseArbiter implements
MultiActiveLeaseArbit
return this.decoratedLeaseArbiter.get().recordLeaseSuccess(status);
}
- protected void
scheduleReminderForEvent(MultiActiveLeaseArbiter.LeaseAttemptStatus leaseStatus)
+ protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
throws SchedulerException {
if (!this.dagActionReminderScheduler.isPresent()) {
throw new RuntimeException(MISSING_OPTIONAL_ERROR_MESSAGE);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 3e10b8319..ce9f56705 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -36,7 +36,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
index a77f3cfcf..237eafdb3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/DagTask.java
@@ -22,12 +22,12 @@ import java.io.IOException;
import lombok.Getter;
import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
@@ -40,10 +40,10 @@ import
org.apache.gobblin.service.modules.orchestration.proc.DagProc;
@Alpha
public abstract class DagTask {
@Getter public final DagActionStore.DagAction dagAction;
- private final MultiActiveLeaseArbiter.LeaseObtainedStatus
leaseObtainedStatus;
+ private final LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus;
@Getter protected final DagManager.DagId dagId;
- public DagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ public DagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
this.dagAction = dagAction;
this.leaseObtainedStatus = leaseObtainedStatus;
this.dagId = DagManagerUtils.generateDagId(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
index c83edacea..98ede576b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/task/LaunchDagTask.java
@@ -17,9 +17,9 @@
package org.apache.gobblin.service.modules.orchestration.task;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
+import org.apache.gobblin.service.modules.orchestration.LeaseAttemptStatus;
/**
@@ -27,7 +27,7 @@ import
org.apache.gobblin.service.modules.orchestration.DagTaskVisitor;
*/
public class LaunchDagTask extends DagTask {
- public LaunchDagTask(DagActionStore.DagAction dagAction,
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus) {
+ public LaunchDagTask(DagActionStore.DagAction dagAction,
LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus) {
super(dagAction, leaseObtainedStatus);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index 0b989c3b1..650938971 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -30,7 +30,7 @@ import java.sql.SQLException;
import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
import org.apache.gobblin.service.FlowStatusId;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 30c55f4e0..00fc8b566 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -41,7 +41,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index 9d7106373..07e4229f8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -26,7 +26,7 @@ import javax.inject.Named;
import javax.inject.Provider;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.modules.orchestration.DagManager;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
index ab3ea9c37..928266602 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java
@@ -24,7 +24,7 @@ import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metrics.ContextAwareMeter;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
@@ -52,8 +52,8 @@ public class DagManagementDagActionStoreChangeMonitor extends
DagActionStoreChan
}
/**
- * This implementation passes on the {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction} to the
- * {@link DagManagement} instead of finding a {@link
org.apache.gobblin.runtime.api.FlowSpec} and passing the spec to {@link
Orchestrator}.
+ * This implementation passes on the {@link DagActionStore.DagAction} to
{@link DagManagement} instead of finding a
+ * {@link org.apache.gobblin.runtime.api.FlowSpec} and passing the spec to
{@link Orchestrator}.
*/
@Override
protected void handleDagAction(DagActionStore.DagAction dagAction, boolean
isStartup) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
index 4eeb2bbee..6853ed79f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitorFactory.java
@@ -26,7 +26,7 @@ import javax.inject.Named;
import javax.inject.Provider;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.modules.orchestration.DagManagement;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 68034658c..5a42e05d7 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -32,8 +32,6 @@ import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
@@ -100,9 +98,9 @@ public class DagManagementTaskStreamImplTest {
dagManagementTaskStream.addDagAction(launchAction);
when(dagManagementTaskStream.getReminderSettingDagProcLeaseArbiter().get()
.tryAcquireLease(any(DagActionStore.DagAction.class), anyLong(),
anyBoolean(), anyBoolean()))
- .thenReturn(new MultiActiveLeaseArbiter.NoLongerLeasingStatus(),
- new MultiActiveLeaseArbiter.LeasedToAnotherStatus(launchAction,
15),
- new MultiActiveLeaseArbiter.LeaseObtainedStatus(launchAction, 0,
5, null));
+ .thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),
+ new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 15),
+ new LeaseAttemptStatus.LeaseObtainedStatus(launchAction, 0, 5,
null));
DagTask dagTask = dagManagementTaskStream.next();
Assert.assertTrue(dagTask instanceof LaunchDagTask);
DagProc dagProc = dagTask.host(this.dagProcFactory);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index ef7296393..bf84f890c 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -45,10 +45,8 @@ import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.TopologySpec;
-import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.modules.flowgraph.Dag;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
index 9d9f584a8..b2a72d05b 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java
@@ -38,7 +38,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.metrics.event.EventSubmitter;
-import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.proc.DagProc;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index 5a430191d..655532ab0 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.service.modules.orchestration;
import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.junit.Assert;
import org.quartz.JobDataMap;
@@ -35,8 +33,8 @@ public class FlowLaunchHandlerTest {
int schedulerBackOffMillis = 10;
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction("flowName", "flowGroup",
String.valueOf(eventToRevisit), "jobName",
DagActionStore.DagActionType.LAUNCH);
- MultiActiveLeaseArbiter.LeasedToAnotherStatus leasedToAnotherStatus =
- new MultiActiveLeaseArbiter.LeasedToAnotherStatus(dagAction,
minimumLingerDurationMillis);
+ LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
+ new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction,
minimumLingerDurationMillis);
/**
* Remove first two fields from cron expression representing seconds and
minutes to return truncated cron expression
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
similarity index 98%
rename from
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
rename to
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
index 944cfa027..2b0edfdec 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.dag_action_store;
+package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
-
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashSet;
-import org.apache.gobblin.runtime.api.DagActionStore;
+
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
similarity index 84%
rename from
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
rename to
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 9247cdbd0..5b0e4c424 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -15,23 +15,26 @@
* limitations under the License.
*/
-package org.apache.gobblin.runtime.api;
+package org.apache.gobblin.service.modules.orchestration;
-import com.typesafe.config.Config;
import java.io.IOException;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Optional;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-import static org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter.*;
@Slf4j
public class MysqlMultiActiveLeaseArbiterTest {
@@ -58,9 +61,9 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static final Timestamp dummyTimestamp = new Timestamp(99999);
private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
private String formattedAcquireLeaseIfMatchingAllStatement =
-
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
TABLE);
+
String.format(MysqlMultiActiveLeaseArbiter.CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
TABLE);
private String formattedAcquireLeaseIfFinishedStatement =
- String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
TABLE);
+
String.format(MysqlMultiActiveLeaseArbiter.CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
TABLE);
// The setup functionality verifies that the initialization of the tables is
done correctly and verifies any SQL
// syntax errors.
@@ -89,11 +92,11 @@ public class MysqlMultiActiveLeaseArbiterTest {
@Test
public void testAcquireLeaseSingleParticipant() throws Exception {
// Tests CASE 1 of acquire lease for a flow action event not present in DB
- MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
+ LeaseAttemptStatus firstLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
- Assert.assertTrue(firstLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
- MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
- (MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
+ Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
+ LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
+ (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
@@ -103,22 +106,22 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Verify that different DagAction types for the same flow can have leases
at the same time
DagActionStore.DagAction killDagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.KILL);
- MultiActiveLeaseArbiter.LeaseAttemptStatus killStatus =
+ LeaseAttemptStatus killStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction,
eventTimeMillis, false, true);
- Assert.assertTrue(killStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
- MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
- (MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
+ Assert.assertTrue(killStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
+ LeaseAttemptStatus.LeaseObtainedStatus killObtainedStatus =
+ (LeaseAttemptStatus.LeaseObtainedStatus) killStatus;
Assert.assertTrue(
killObtainedStatus.getLeaseAcquisitionTimestamp() >=
killObtainedStatus.getEventTimeMillis());
// Tests CASE 2 of acquire lease for a flow action event that already has
a valid lease for the same event in db
// Very little time should have passed if this test directly follows the
one above so this call will be considered
// the same as the previous event
- MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
+ LeaseAttemptStatus secondLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
- Assert.assertTrue(secondLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
- MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
- (MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
+ Assert.assertTrue(secondLaunchStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
+ LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
+ (LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(),
secondLeasedToAnotherStatus.getEventTimeMillis());
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis()
> 0);
@@ -126,21 +129,21 @@ public class MysqlMultiActiveLeaseArbiterTest {
// valid
// Allow enough time to pass for this trigger to be considered distinct,
but not enough time so the lease expires
Thread.sleep(MORE_THAN_EPSILON);
- MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
+ LeaseAttemptStatus thirdLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
- Assert.assertTrue(thirdLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
- MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
- (MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
+ Assert.assertTrue(thirdLaunchStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
+ LeaseAttemptStatus.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
+ (LeaseAttemptStatus.LeasedToAnotherStatus) thirdLaunchStatus;
Assert.assertTrue(thirdLeasedToAnotherStatus.getEventTimeMillis() >
firstObtainedStatus.getEventTimeMillis());
Assert.assertTrue(thirdLeasedToAnotherStatus.getMinimumLingerDurationMillis() <
LINGER);
// Tests CASE 4 of lease out of date
Thread.sleep(LINGER);
- MultiActiveLeaseArbiter.LeaseAttemptStatus fourthLaunchStatus =
+ LeaseAttemptStatus fourthLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
- Assert.assertTrue(fourthLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
- MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
- (MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
+ Assert.assertTrue(fourthLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
+ LeaseAttemptStatus.LeaseObtainedStatus fourthObtainedStatus =
+ (LeaseAttemptStatus.LeaseObtainedStatus) fourthLaunchStatus;
Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis() >
eventTimeMillis + LINGER);
Assert.assertTrue(fourthObtainedStatus.getEventTimeMillis()
<= fourthObtainedStatus.getLeaseAcquisitionTimestamp());
@@ -149,18 +152,18 @@ public class MysqlMultiActiveLeaseArbiterTest {
// done immediately after previous lease obtainment so should be marked as
the same event
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
Assert.assertTrue(System.currentTimeMillis() -
fourthObtainedStatus.getEventTimeMillis() < EPSILON);
- MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
+ LeaseAttemptStatus fifthLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
- Assert.assertTrue(fifthLaunchStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus);
+ Assert.assertTrue(fifthLaunchStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus);
// Tests CASE 6 of no longer leasing a distinct event in DB
// Wait so this event is considered distinct and a new lease will be
acquired
Thread.sleep(MORE_THAN_EPSILON);
- MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
+ LeaseAttemptStatus sixthLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction,
eventTimeMillis, false, true);
- Assert.assertTrue(sixthLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
- MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
- (MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
+ Assert.assertTrue(sixthLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
+ LeaseAttemptStatus.LeaseObtainedStatus sixthObtainedStatus =
+ (LeaseAttemptStatus.LeaseObtainedStatus) sixthLaunchStatus;
Assert.assertTrue(sixthObtainedStatus.getEventTimeMillis()
<= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
}
@@ -225,7 +228,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
DagActionStore.DagAction updatedResumeDagAction =
resumeDagAction.updateFlowExecutionId(
selectInfoResult.getEventTimeMillis());
- boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
+ boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new
LeaseAttemptStatus.LeaseObtainedStatus(
updatedResumeDagAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
Assert.assertTrue(markedSuccess);
// Ensure no NPE results from calling this after a lease has been
completed and acquisition timestamp val is NULL
@@ -256,7 +259,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
long olderEventTimestamp = selectInfoResult.getEventTimeMillis() - 1;
LeaseAttemptStatus attemptStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
olderEventTimestamp, true, true);
- Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
+ Assert.assertTrue(attemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus);
}
/*
@@ -271,8 +274,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
LeaseAttemptStatus attemptStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true, true);
- Assert.assertTrue(attemptStatus instanceof LeasedToAnotherStatus);
- LeasedToAnotherStatus leasedToAnotherStatus = (LeasedToAnotherStatus)
attemptStatus;
+ Assert.assertTrue(attemptStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
+ LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
(LeaseAttemptStatus.LeasedToAnotherStatus) attemptStatus;
Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(),
selectInfoResult.getEventTimeMillis());
}
@@ -288,8 +291,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
Thread.sleep(MORE_THAN_LINGER);
LeaseAttemptStatus attemptStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true, true);
- Assert.assertTrue(attemptStatus instanceof LeaseObtainedStatus);
- LeaseObtainedStatus obtainedStatus = (LeaseObtainedStatus) attemptStatus;
+ Assert.assertTrue(attemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
+ LeaseAttemptStatus.LeaseObtainedStatus obtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) attemptStatus;
Assert.assertTrue(obtainedStatus.getEventTimeMillis() >
selectInfoResult.getEventTimeMillis());
Assert.assertTrue(obtainedStatus.getLeaseAcquisitionTimestamp() >
selectInfoResult.getLeaseAcquisitionTimeMillis().get().longValue());
}
@@ -307,7 +310,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
DagActionStore.DagAction updatedResumeDagAction =
resumeDagAction.updateFlowExecutionId(
selectInfoResult.getEventTimeMillis());
- boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
+ boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new
LeaseAttemptStatus.LeaseObtainedStatus(
updatedResumeDagAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
Assert.assertTrue(markedSuccess);
@@ -316,7 +319,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Now have a reminder event check-in on the completed lease
LeaseAttemptStatus attemptStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction,
selectInfoResult.getEventTimeMillis(), true, true);
- Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
+ Assert.assertTrue(attemptStatus instanceof
LeaseAttemptStatus.NoLongerLeasingStatus);
}
/*
@@ -327,22 +330,22 @@ public class MysqlMultiActiveLeaseArbiterTest {
@Test
public void testSkipAdoptingConsensusFlowExecutionId() throws IOException {
// Obtain a lease for a new action and verify its flowExecutionId is not
updated
- MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
+ LeaseAttemptStatus firstLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2,
eventTimeMillis, false, false);
- Assert.assertTrue(firstLaunchStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus);
- MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
- (MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
+ Assert.assertTrue(firstLaunchStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus);
+ LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
+ (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH)));
// A second attempt to obtain a lease on the same action should return a
LeasedToAnotherStatus which also contains
// the original flowExecutionId
- MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
+ LeaseAttemptStatus secondLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2,
eventTimeMillis, false, false);
- Assert.assertTrue(secondLaunchStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus);
- MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
- (MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
+ Assert.assertTrue(secondLaunchStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus);
+ LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
+ (LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(),
secondLeasedToAnotherStatus.getEventTimeMillis());
Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH)));
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
index f00e35fbf..e75b2952e 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProcTest.java
@@ -35,7 +35,7 @@ import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.orchestration.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecExecutor;