This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new a74d17a01 [GOBBLIN-2051] Rework `FlowLaunchHandler`, `DagActionStore`,
and related classes for clarity (#3927)
a74d17a01 is described below
commit a74d17a0123218ac4c867caeefaee2f472b438e7
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Apr 23 14:12:18 2024 -0700
[GOBBLIN-2051] Rework `FlowLaunchHandler`, `DagActionStore`, and related
classes for clarity (#3927)
Rework `FlowLaunchHandler`, `DagActionStore`, and related class' javadoc
and method naming for clarity
---
.../org/apache/gobblin/scheduler/JobScheduler.java | 6 +-
.../apache/gobblin/scheduler/JobSchedulerTest.java | 4 +-
.../modules/orchestration/DagActionStore.java | 20 ++++-
.../orchestration/DagManagementTaskStreamImpl.java | 2 +-
.../modules/orchestration/FlowLaunchHandler.java | 94 +++++++++++-----------
.../modules/orchestration/LeaseAttemptStatus.java | 23 +++---
.../orchestration/MultiActiveLeaseArbiter.java | 8 +-
.../modules/orchestration/MysqlDagActionStore.java | 6 --
.../MysqlMultiActiveLeaseArbiter.java | 2 +-
.../modules/orchestration/Orchestrator.java | 32 ++++----
.../MysqlMultiActiveLeaseArbiterTest.java | 6 +-
11 files changed, 109 insertions(+), 94 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index a9761bff3..f988d7648 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -395,7 +395,7 @@ public class JobScheduler extends AbstractIdleService {
try {
// Schedule the Quartz job with a trigger built from the job
configuration
- Trigger trigger = createTriggerForJob(job.getKey(), jobProps,
Optional.absent());
+ Trigger trigger = createTriggerForJob(job.getKey(), jobProps,
java.util.Optional.empty());
this.scheduler.getScheduler().scheduleJob(job, trigger);
logNewlyScheduledJob(job, trigger);
} catch (SchedulerException se) {
@@ -585,11 +585,11 @@ public class JobScheduler extends AbstractIdleService {
* Get a {@link org.quartz.Trigger} from the given job configuration
properties. If triggerSuffix is provided, appends
* it to the end of the flow name. The suffix is used to add multiple unique
triggers associated with the same job
*/
- public static Trigger createTriggerForJob(JobKey jobKey, Properties
jobProps, Optional<String> triggerSuffix) {
+ public static Trigger createTriggerForJob(JobKey jobKey, Properties
jobProps, java.util.Optional<String> triggerSuffix) {
// Build a trigger for the job with the given cron-style schedule
return TriggerBuilder.newTrigger()
.withIdentity(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)
- + triggerSuffix.transform(s -> "_" + s).or(""),
+ + triggerSuffix.map(s -> "_" + s).orElse(""),
Strings.nullToEmpty(jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY)))
.forJob(jobKey)
.withSchedule(CronScheduleBuilder.cronSchedule(jobProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY)))
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
index 1d3514804..290575497 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
@@ -17,7 +17,7 @@
package org.apache.gobblin.scheduler;
-import com.google.common.base.Optional;
+import java.util.Optional;
import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.junit.Assert;
@@ -39,7 +39,7 @@ public class JobSchedulerTest {
jobProps.put(ConfigurationKeys.JOB_GROUP_KEY, jobGroup);
jobProps.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?");
- Trigger trigger1 = JobScheduler.createTriggerForJob(jobKey, jobProps,
Optional.absent());
+ Trigger trigger1 = JobScheduler.createTriggerForJob(jobKey, jobProps,
Optional.empty());
Trigger trigger2 = JobScheduler.createTriggerForJob(jobKey, jobProps,
Optional.of("suffix"));
Assert.assertFalse(trigger1.getKey().equals(trigger2.getKey()));
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index eafb05750..a44f7b422 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -46,6 +46,10 @@ public interface DagActionStore {
final String jobName;
final DagActionType dagActionType;
+ public static DagAction forFlow(String flowGroup, String flowName, String
flowExecutionId, DagActionType dagActionType) {
+ return new DagAction(flowGroup, flowName, flowExecutionId,
NO_JOB_NAME_DEFAULT, dagActionType);
+ }
+
public FlowId getFlowId() {
return new
FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
}
@@ -90,6 +94,16 @@ public interface DagActionStore {
*/
boolean exists(String flowGroup, String flowName, String flowExecutionId,
DagActionType dagActionType) throws IOException, SQLException;
+ /** Persist the {@link DagAction} in {@link DagActionStore} for durability */
+ default void addDagAction(DagAction dagAction) throws IOException {
+ addJobDagAction(
+ dagAction.getFlowGroup(),
+ dagAction.getFlowName(),
+ dagAction.getFlowExecutionId(),
+ dagAction.getJobName(),
+ dagAction.getDagActionType());
+ }
+
/**
* Persist the dag action in {@link DagActionStore} for durability
* @param flowGroup flow group for the dag action
@@ -109,11 +123,13 @@ public interface DagActionStore {
* @param dagActionType the value of the dag action
* @throws IOException
*/
- void addFlowDagAction(String flowGroup, String flowName, String
flowExecutionId, DagActionType dagActionType) throws IOException;
+ default void addFlowDagAction(String flowGroup, String flowName, String
flowExecutionId, DagActionType dagActionType) throws IOException {
+ addDagAction(DagAction.forFlow(flowGroup, flowName, flowExecutionId,
dagActionType));
+ }
/**
* delete the dag action from {@link DagActionStore}
- * @param DagAction containing all information needed to identify dag and
specific action value
+ * @param dagAction containing all information needed to identify dag and
specific action value
* @throws IOException
* @return true if we successfully delete one record, return false if the
record does not exist
*/
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 63aca3e8d..9aeab6696 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -177,7 +177,7 @@ public class DagManagementTaskStreamImpl implements
DagManagement, DagTaskStream
*/
protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
throws SchedulerException {
-
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getDagAction(),
+
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusDagAction(),
leaseStatus.getMinimumLingerDurationMillis());
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index c41bfe81f..1c7c82358 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -24,6 +24,7 @@ import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.Locale;
+import java.util.Optional;
import java.util.Properties;
import java.util.Random;
@@ -35,7 +36,6 @@ import org.quartz.Trigger;
import org.quartz.impl.JobDetailImpl;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.typesafe.config.Config;
import javax.inject.Inject;
@@ -68,7 +68,7 @@ import org.apache.gobblin.util.ConfigUtils;
@Slf4j
public class FlowLaunchHandler {
private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
- private Optional<DagActionStore> dagActionStore;
+ private DagActionStore dagActionStore;
private final MetricContext metricContext;
private final int schedulerMaxBackoffMillis;
private static Random random = new Random();
@@ -80,9 +80,14 @@ public class FlowLaunchHandler {
@Inject
public FlowLaunchHandler(Config config,
@Named(ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME)
MultiActiveLeaseArbiter leaseArbiter,
- SchedulerService schedulerService, Optional<DagActionStore>
dagActionStore) {
+ SchedulerService schedulerService,
com.google.common.base.Optional<DagActionStore> optDagActionStore) {
this.multiActiveLeaseArbiter = leaseArbiter;
- this.dagActionStore = dagActionStore;
+
+ if (!optDagActionStore.isPresent()) {
+ throw new RuntimeException("DagActionStore MUST be present for flow
launch handling!");
+ }
+ this.dagActionStore = optDagActionStore.get();
+
this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
this.schedulerService = schedulerService;
@@ -99,51 +104,46 @@ public class FlowLaunchHandler {
* This method is used in the multi-active scheduler case for one or more
hosts to respond to a launch dag action
* event triggered by the scheduler by attempting a lease for the launch
event and processing the result depending on
* the status of the attempt.
- * @param jobProps
- * @param dagAction
- * @param eventTimeMillis
- * @param isReminderEvent
- * @param skipFlowExecutionIdReplacement
- * @throws IOException
*/
public void handleFlowLaunchTriggerEvent(Properties jobProps,
DagActionStore.DagAction dagAction,
- long eventTimeMillis, boolean isReminderEvent, boolean
skipFlowExecutionIdReplacement) throws IOException {
- LeaseAttemptStatus
- leaseAttemptStatus = this.multiActiveLeaseArbiter
- .tryAcquireLease(dagAction, eventTimeMillis, isReminderEvent,
skipFlowExecutionIdReplacement);
- if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeaseObtainedStatus) {
- LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus =
- (LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus;
- if (persistDagAction(leaseObtainedStatus)) {
- log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ",
leaseObtainedStatus.getDagAction(),
- leaseObtainedStatus.getEventTimeMillis());
- return;
- }
- // If persisting the dag action failed, then we set another trigger
for this event to occur immediately to
- // re-attempt handling the event
- scheduleReminderForEvent(jobProps,
- new
LeaseAttemptStatus.LeasedToAnotherStatus(leaseObtainedStatus.getDagAction(),
0L), eventTimeMillis);
- } else if (leaseAttemptStatus instanceof
LeaseAttemptStatus.LeasedToAnotherStatus) {
- scheduleReminderForEvent(jobProps,
(LeaseAttemptStatus.LeasedToAnotherStatus) leaseAttemptStatus,
- eventTimeMillis);
- }
- // Otherwise leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus & no need to do anything
+ long eventTimeMillis, boolean isReminderEvent, boolean
adoptConsensusFlowExecutionId) throws IOException {
+ LeaseAttemptStatus leaseAttempt =
this.multiActiveLeaseArbiter.tryAcquireLease(
+ dagAction, eventTimeMillis, isReminderEvent,
adoptConsensusFlowExecutionId);
+ if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus
+ && persistDagAction((LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttempt)) {
+ log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ",
leaseAttempt.getConsensusDagAction(),
+ ((LeaseAttemptStatus.LeaseObtainedStatus)
leaseAttempt).getEventTimeMillis());
+ } else { // when NOT successfully `persistDagAction`, set a reminder to
re-attempt handling (unless leasing finished)
+
calcLeasedToAnotherStatusForReminder(leaseAttempt).ifPresent(leasedToAnother ->
+ scheduleReminderForEvent(jobProps, leasedToAnother,
eventTimeMillis));
+ }
}
- // Called after obtaining a lease to persist the dag action to {@link
DagActionStore} and mark the lease as done
- private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus
leaseStatus) {
- if (this.dagActionStore.isPresent()) {
- try {
- DagActionStore.DagAction dagAction = leaseStatus.getDagAction();
- this.dagActionStore.get().addFlowDagAction(dagAction.getFlowGroup(),
dagAction.getFlowName(), dagAction.getFlowExecutionId(),
dagAction.getDagActionType());
- // If the dag action has been persisted to the {@link DagActionStore}
we can close the lease
- this.numFlowsSubmitted.mark();
- return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
+ /** @return {@link Optional} status for reminding, unless {@link
LeaseAttemptStatus.NoLongerLeasingStatus} (hence nothing to do) */
+ private Optional<LeaseAttemptStatus.LeasedToAnotherStatus>
calcLeasedToAnotherStatusForReminder(LeaseAttemptStatus leaseAttempt) {
+ if (leaseAttempt instanceof LeaseAttemptStatus.NoLongerLeasingStatus) { //
all done: nothing to remind about
+ return Optional.empty();
+ } else if (leaseAttempt instanceof
LeaseAttemptStatus.LeasedToAnotherStatus) { // already have one: just return it
+ return Optional.of((LeaseAttemptStatus.LeasedToAnotherStatus)
leaseAttempt);
+ } else if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus)
{ // remind w/o delay to immediately re-attempt handling
+ return Optional.of(new
LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(),
0L));
} else {
- throw new RuntimeException("DagActionStore is " +
(this.dagActionStore.isPresent() ? "" : "NOT") + " present.");
+ throw new RuntimeException("unexpected `LeaseAttemptStatus` derived
type: '" + leaseAttempt.getClass().getName() + "' in '" + leaseAttempt + "'");
+ }
+ }
+
+ /**
+ * Called after obtaining a lease to both persist to the {@link
DagActionStore} and
+ * {@link
MultiActiveLeaseArbiter#recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus)}
+ */
+ private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus
leaseStatus) {
+ try {
+ this.dagActionStore.addDagAction(leaseStatus.getConsensusDagAction());
+ this.numFlowsSubmitted.mark();
+ // after successfully persisting, close the lease
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
@@ -156,7 +156,7 @@ public class FlowLaunchHandler {
*/
private void scheduleReminderForEvent(Properties jobProps,
LeaseAttemptStatus.LeasedToAnotherStatus status,
long triggerEventTimeMillis) {
- DagActionStore.DagAction dagAction = status.getDagAction();
+ DagActionStore.DagAction dagAction = status.getConsensusDagAction();
JobKey origJobKey = new
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job
group>>"));
try {
@@ -196,7 +196,7 @@ public class FlowLaunchHandler {
Trigger reminderTrigger = JobScheduler.createTriggerForJob(reminderJobKey,
getJobPropertiesFromJobDetail(jobDetail),
Optional.of(reminderSuffix));
log.debug("Flow Launch Handler - [{}, eventTimestamp: {}] - attempting to
schedule reminder for event {} with "
- + "reminderJobKey {} and reminderTriggerKey {}",
status.getDagAction(), triggerEventTimeMillis,
+ + "reminderJobKey {} and reminderTriggerKey {}",
status.getConsensusDagAction(), triggerEventTimeMillis,
status.getEventTimeMillis(), reminderJobKey, reminderTrigger.getKey());
this.schedulerService.getScheduler().scheduleJob(jobDetail,
reminderTrigger);
return reminderTrigger;
@@ -258,7 +258,7 @@ public class FlowLaunchHandler {
// Saves the following properties in jobProps to retrieve when the trigger
fires
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
String.valueOf(getUTCTimeFromDelayPeriod(delayPeriodMillis)));
- // Use the db laundered timestamp for the reminder to ensure consensus
between hosts. Participant trigger timestamps
+ // Use the db consensus timestamp for the reminder to ensure inter-host
agreement. Participant trigger timestamps
// can differ between participants and be interpreted as a reminder for a
distinct flow trigger which will cause
// excess flows to be triggered by the reminder functionality.
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
index 65526cb21..f77427892 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
@@ -25,13 +25,18 @@ import lombok.Getter;
/**
- Class used to encapsulate status of lease acquisition attempts made by {@link
MultiActiveLeaseArbiter} and contains
- information specific to the status that results. The {@link
LeaseAttemptStatus#getDagAction} and
- {@link LeaseAttemptStatus#getMinimumLingerDurationMillis} are meant to be
- overridden and used by relevant derived classes.
+ * Hierarchy to convey the specific outcome of attempted lease acquisition via
the {@link MultiActiveLeaseArbiter},
+ * with each derived type carrying outcome-specific status info.
+ *
+ * IMPL. NOTE: {@link LeaseAttemptStatus#getConsensusDagAction} and {@link
LeaseAttemptStatus#getMinimumLingerDurationMillis}
+ * intended for `@Override`.
*/
public abstract class LeaseAttemptStatus {
- public DagActionStore.DagAction getDagAction() {
+ /**
+ * @return the {@link DagActionStore.DagAction}, which may now have an
updated flowExecutionId that MUST henceforth be
+ * used; {@see MultiActiveLeaseArbiter#tryAcquireLease}
+ */
+ public DagActionStore.DagAction getConsensusDagAction() {
return null;
}
@@ -53,7 +58,7 @@ public abstract class LeaseAttemptStatus {
*/
@Data
public static class LeaseObtainedStatus extends LeaseAttemptStatus {
- private final DagActionStore.DagAction dagAction;
+ private final DagActionStore.DagAction consensusDagAction;
private final long leaseAcquisitionTimestamp;
private final long minimumLingerDurationMillis;
@Getter(AccessLevel.NONE)
@@ -63,7 +68,7 @@ public abstract class LeaseAttemptStatus {
* @return event time in millis since epoch for the event of this lease
acquisition
*/
public long getEventTimeMillis() {
- return Long.parseLong(dagAction.getFlowExecutionId());
+ return Long.parseLong(consensusDagAction.getFlowExecutionId());
}
/**
@@ -85,7 +90,7 @@ public abstract class LeaseAttemptStatus {
*/
@Data
public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
- private final DagActionStore.DagAction dagAction;
+ private final DagActionStore.DagAction consensusDagAction;
private final long minimumLingerDurationMillis;
/**
@@ -93,7 +98,7 @@ public abstract class LeaseAttemptStatus {
* @return
*/
public long getEventTimeMillis() {
- return Long.parseLong(dagAction.getFlowExecutionId());
+ return Long.parseLong(consensusDagAction.getFlowExecutionId());
}
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
index 3fb5acb95..ba9c17c75 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
@@ -52,11 +52,11 @@ public interface MultiActiveLeaseArbiter {
* @param eventTimeMillis is the time this dag action was triggered
* @param isReminderEvent true if the dag action event we're checking on is
a reminder event
* @param adoptConsensusFlowExecutionId if true then replaces the dagAction
flowExecutionId returned in
- * LeaseAttemptStatuses with the
consensus eventTime
+ * LeaseAttemptStatuses with the
consensus eventTime, accessed via
+ * {@link
LeaseAttemptStatus#getConsensusDagAction()}
*
- * @return LeaseAttemptStatus, containing a dag action that will have an
updated flow execution id if `
- * adoptConsensusFlowExecutionId` is true. The caller should use the newer
version of the dag action to easily track
- * the action moving forward.
+ * @return {@link LeaseAttemptStatus}, containing, when
`adoptConsensusFlowExecutionId`, a universally-agreed-upon
+ * {@link DagActionStore.DagAction} with a possibly updated ("laundered")
flow execution id that MUST be used thereafter
* @throws IOException
*/
LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction dagAction, long
eventTimeMillis, boolean isReminderEvent,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index f0b93e059..1141c0e9c 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -144,12 +144,6 @@ public class MysqlDagActionStore implements DagActionStore
{
}}, true);
}
- @Override
- public void addFlowDagAction(String flowGroup, String flowName, String
flowExecutionId, DagActionType dagActionType)
- throws IOException {
- addJobDagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT,
dagActionType);
- }
-
@Override
public boolean deleteDagAction(DagAction dagAction) throws IOException {
return
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT,
tableName), deleteStatement -> {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 113027b24..4479358ec 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -585,7 +585,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
@Override
public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus
status)
throws IOException {
- DagActionStore.DagAction dagAction = status.getDagAction();
+ DagActionStore.DagAction dagAction = status.getConsensusDagAction();
return
dbStatementExecutor.withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT,
leaseArbiterTableName),
updateStatement -> {
int i = 0;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 7466fb09b..86f1d12a9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -102,14 +102,14 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
private UserQuotaManager quotaManager;
private final FlowCompilationValidationHelper
flowCompilationValidationHelper;
- private Optional<FlowLaunchHandler> flowTriggerDecorator;
+ private Optional<FlowLaunchHandler> flowLaunchHandler;
private Optional<FlowCatalog> flowCatalog;
@Getter
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
@Inject
public Orchestrator(Config config, TopologyCatalog topologyCatalog,
DagManager dagManager,
- Optional<Logger> log, FlowStatusGenerator flowStatusGenerator,
Optional<FlowLaunchHandler> flowTriggerDecorator,
+ Optional<Logger> log, FlowStatusGenerator flowStatusGenerator,
Optional<FlowLaunchHandler> flowLaunchHandler,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
Optional<FlowCatalog> flowCatalog,
Optional<DagManagementStateStore> dagManagementStateStore,
FlowCompilationValidationHelper flowCompilationValidationHelper) throws
IOException {
@@ -117,7 +117,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
this.flowStatusGenerator = flowStatusGenerator;
- this.flowTriggerDecorator = flowTriggerDecorator;
+ this.flowLaunchHandler = flowLaunchHandler;
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
this.flowCatalog = flowCatalog;
this.flowCompilationValidationHelper = flowCompilationValidationHelper;
@@ -213,24 +213,24 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup,
flowName);
- Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
- String flowExecutionId =
String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
-
-
- DagActionStore.DagAction launchDagAction =
- new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH);
-
- // If multi-active scheduler is enabled do not pass onto DagManager,
otherwise scheduler forwards it directly
- // Skip flow compilation as well, since we recompile after receiving
event from DagActionStoreChangeMonitor later
- if (flowTriggerDecorator.isPresent()) {
-
- // Adopt consensus flowExecutionId for scheduled flows
- flowTriggerDecorator.get().handleFlowLaunchTriggerEvent(jobProps,
launchDagAction, triggerTimestampMillis, isReminderEvent,
+ // only compile and pass directly to `DagManager` when multi-active NOT
enabled; otherwise recompilation to occur later,
+ // once `DagActionStoreChangeMonitor` subsequently delegates this
`DagActionType.LAUNCH`
+ if (flowLaunchHandler.isPresent()) {
+ DagActionStore.DagAction launchDagAction =
DagActionStore.DagAction.forFlow(
+ flowGroup,
+ flowName,
+ String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec)),
+ DagActionStore.DagActionType.LAUNCH
+ );
+
+ // `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as
clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment
+ flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps,
launchDagAction, triggerTimestampMillis, isReminderEvent,
flowSpec.isScheduled());
_log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
launchDagAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
} else {
TimingEvent flowCompilationTimer = new
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
Optional<Dag<JobExecutionPlan>> compiledDagOptional =
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
flowSpec, flowGroup,
flowName, flowMetadata);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 603db5e19..48795176e 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -103,7 +103,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
(LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
- Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
+ Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
new DagActionStore.DagAction(flowGroup, flowName,
String.valueOf(firstObtainedStatus.getEventTimeMillis()),
jobName, DagActionStore.DagActionType.LAUNCH)));
@@ -340,7 +340,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
(LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
firstObtainedStatus.getLeaseAcquisitionTimestamp());
- Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
+ Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH)));
// A second attempt to obtain a lease on the same action should return a
LeasedToAnotherStatus which also contains
@@ -351,7 +351,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(),
secondLeasedToAnotherStatus.getEventTimeMillis());
- Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
+ Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId,
jobName, DagActionStore.DagActionType.LAUNCH)));
}