This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new a74d17a01 [GOBBLIN-2051] Rework `FlowLaunchHandler`, `DagActionStore`, 
and related classes for clarity (#3927)
a74d17a01 is described below

commit a74d17a0123218ac4c867caeefaee2f472b438e7
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Apr 23 14:12:18 2024 -0700

    [GOBBLIN-2051] Rework `FlowLaunchHandler`, `DagActionStore`, and related 
classes for clarity (#3927)
    
    Rework `FlowLaunchHandler`, `DagActionStore`, and related class' javadoc 
and method naming for clarity
---
 .../org/apache/gobblin/scheduler/JobScheduler.java |  6 +-
 .../apache/gobblin/scheduler/JobSchedulerTest.java |  4 +-
 .../modules/orchestration/DagActionStore.java      | 20 ++++-
 .../orchestration/DagManagementTaskStreamImpl.java |  2 +-
 .../modules/orchestration/FlowLaunchHandler.java   | 94 +++++++++++-----------
 .../modules/orchestration/LeaseAttemptStatus.java  | 23 +++---
 .../orchestration/MultiActiveLeaseArbiter.java     |  8 +-
 .../modules/orchestration/MysqlDagActionStore.java |  6 --
 .../MysqlMultiActiveLeaseArbiter.java              |  2 +-
 .../modules/orchestration/Orchestrator.java        | 32 ++++----
 .../MysqlMultiActiveLeaseArbiterTest.java          |  6 +-
 11 files changed, 109 insertions(+), 94 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index a9761bff3..f988d7648 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -395,7 +395,7 @@ public class JobScheduler extends AbstractIdleService {
 
     try {
       // Schedule the Quartz job with a trigger built from the job 
configuration
-      Trigger trigger = createTriggerForJob(job.getKey(), jobProps, 
Optional.absent());
+      Trigger trigger = createTriggerForJob(job.getKey(), jobProps, 
java.util.Optional.empty());
       this.scheduler.getScheduler().scheduleJob(job, trigger);
       logNewlyScheduledJob(job, trigger);
     } catch (SchedulerException se) {
@@ -585,11 +585,11 @@ public class JobScheduler extends AbstractIdleService {
    * Get a {@link org.quartz.Trigger} from the given job configuration 
properties. If triggerSuffix is provided, appends
    * it to the end of the flow name. The suffix is used to add multiple unique 
triggers associated with the same job
    */
-  public static Trigger createTriggerForJob(JobKey jobKey, Properties 
jobProps, Optional<String> triggerSuffix) {
+  public static Trigger createTriggerForJob(JobKey jobKey, Properties 
jobProps, java.util.Optional<String> triggerSuffix) {
     // Build a trigger for the job with the given cron-style schedule
     return TriggerBuilder.newTrigger()
         .withIdentity(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)
-            + triggerSuffix.transform(s -> "_" + s).or(""),
+            + triggerSuffix.map(s -> "_" + s).orElse(""),
             
Strings.nullToEmpty(jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY)))
         .forJob(jobKey)
         
.withSchedule(CronScheduleBuilder.cronSchedule(jobProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY)))
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
index 1d3514804..290575497 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.gobblin.scheduler;
 
-import com.google.common.base.Optional;
+import java.util.Optional;
 import java.util.Properties;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.junit.Assert;
@@ -39,7 +39,7 @@ public class JobSchedulerTest {
     jobProps.put(ConfigurationKeys.JOB_GROUP_KEY, jobGroup);
     jobProps.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?");
 
-    Trigger trigger1 = JobScheduler.createTriggerForJob(jobKey, jobProps, 
Optional.absent());
+    Trigger trigger1 = JobScheduler.createTriggerForJob(jobKey, jobProps, 
Optional.empty());
     Trigger trigger2 = JobScheduler.createTriggerForJob(jobKey, jobProps, 
Optional.of("suffix"));
 
     Assert.assertFalse(trigger1.getKey().equals(trigger2.getKey()));
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
index eafb05750..a44f7b422 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java
@@ -46,6 +46,10 @@ public interface DagActionStore {
     final String jobName;
     final DagActionType dagActionType;
 
+    public static DagAction forFlow(String flowGroup, String flowName, String 
flowExecutionId, DagActionType dagActionType) {
+      return new DagAction(flowGroup, flowName, flowExecutionId, 
NO_JOB_NAME_DEFAULT, dagActionType);
+    }
+
     public FlowId getFlowId() {
       return new 
FlowId().setFlowGroup(this.flowGroup).setFlowName(this.flowName);
     }
@@ -90,6 +94,16 @@ public interface DagActionStore {
    */
   boolean exists(String flowGroup, String flowName, String flowExecutionId, 
DagActionType dagActionType) throws IOException, SQLException;
 
+  /** Persist the {@link DagAction} in {@link DagActionStore} for durability */
+  default void addDagAction(DagAction dagAction) throws IOException {
+    addJobDagAction(
+        dagAction.getFlowGroup(),
+        dagAction.getFlowName(),
+        dagAction.getFlowExecutionId(),
+        dagAction.getJobName(),
+        dagAction.getDagActionType());
+  }
+
   /**
    * Persist the dag action in {@link DagActionStore} for durability
    * @param flowGroup flow group for the dag action
@@ -109,11 +123,13 @@ public interface DagActionStore {
    * @param dagActionType the value of the dag action
    * @throws IOException
    */
-  void addFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionType dagActionType) throws IOException;
+  default void addFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionType dagActionType) throws IOException {
+    addDagAction(DagAction.forFlow(flowGroup, flowName, flowExecutionId, 
dagActionType));
+  }
 
   /**
    * delete the dag action from {@link DagActionStore}
-   * @param DagAction containing all information needed to identify dag and 
specific action value
+   * @param dagAction containing all information needed to identify dag and 
specific action value
    * @throws IOException
    * @return true if we successfully delete one record, return false if the 
record does not exist
    */
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
index 63aca3e8d..9aeab6696 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java
@@ -177,7 +177,7 @@ public class DagManagementTaskStreamImpl implements 
DagManagement, DagTaskStream
   */
   protected void scheduleReminderForEvent(LeaseAttemptStatus leaseStatus)
       throws SchedulerException {
-    
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getDagAction(),
+    
dagActionReminderScheduler.get().scheduleReminder(leaseStatus.getConsensusDagAction(),
         leaseStatus.getMinimumLingerDurationMillis());
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index c41bfe81f..1c7c82358 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -24,6 +24,7 @@ import java.time.format.DateTimeFormatter;
 import java.time.temporal.ChronoUnit;
 import java.util.Date;
 import java.util.Locale;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
 
@@ -35,7 +36,6 @@ import org.quartz.Trigger;
 import org.quartz.impl.JobDetailImpl;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.typesafe.config.Config;
 
 import javax.inject.Inject;
@@ -68,7 +68,7 @@ import org.apache.gobblin.util.ConfigUtils;
 @Slf4j
 public class FlowLaunchHandler {
   private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
-  private Optional<DagActionStore> dagActionStore;
+  private DagActionStore dagActionStore;
   private final MetricContext metricContext;
   private final int schedulerMaxBackoffMillis;
   private static Random random = new Random();
@@ -80,9 +80,14 @@ public class FlowLaunchHandler {
   @Inject
   public FlowLaunchHandler(Config config,
       @Named(ConfigurationKeys.SCHEDULER_LEASE_ARBITER_NAME) 
MultiActiveLeaseArbiter leaseArbiter,
-      SchedulerService schedulerService, Optional<DagActionStore> 
dagActionStore) {
+      SchedulerService schedulerService, 
com.google.common.base.Optional<DagActionStore> optDagActionStore) {
     this.multiActiveLeaseArbiter = leaseArbiter;
-    this.dagActionStore = dagActionStore;
+
+    if (!optDagActionStore.isPresent()) {
+      throw new RuntimeException("DagActionStore MUST be present for flow 
launch handling!");
+    }
+    this.dagActionStore = optDagActionStore.get();
+
     this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
     this.schedulerService = schedulerService;
@@ -99,51 +104,46 @@ public class FlowLaunchHandler {
    * This method is used in the multi-active scheduler case for one or more 
hosts to respond to a launch dag action
    * event triggered by the scheduler by attempting a lease for the launch 
event and processing the result depending on
    * the status of the attempt.
-   * @param jobProps
-   * @param dagAction
-   * @param eventTimeMillis
-   * @param isReminderEvent
-   * @param skipFlowExecutionIdReplacement
-   * @throws IOException
    */
   public void handleFlowLaunchTriggerEvent(Properties jobProps, 
DagActionStore.DagAction dagAction,
-      long eventTimeMillis, boolean isReminderEvent, boolean 
skipFlowExecutionIdReplacement) throws IOException {
-      LeaseAttemptStatus
-          leaseAttemptStatus = this.multiActiveLeaseArbiter
-          .tryAcquireLease(dagAction, eventTimeMillis, isReminderEvent, 
skipFlowExecutionIdReplacement);
-      if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus) {
-        LeaseAttemptStatus.LeaseObtainedStatus leaseObtainedStatus =
-            (LeaseAttemptStatus.LeaseObtainedStatus) leaseAttemptStatus;
-        if (persistDagAction(leaseObtainedStatus)) {
-          log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", 
leaseObtainedStatus.getDagAction(),
-              leaseObtainedStatus.getEventTimeMillis());
-          return;
-        }
-        // If persisting the dag action failed, then we set another trigger 
for this event to occur immediately to
-        // re-attempt handling the event
-        scheduleReminderForEvent(jobProps,
-            new 
LeaseAttemptStatus.LeasedToAnotherStatus(leaseObtainedStatus.getDagAction(), 
0L), eventTimeMillis);
-      } else if (leaseAttemptStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus) {
-        scheduleReminderForEvent(jobProps, 
(LeaseAttemptStatus.LeasedToAnotherStatus) leaseAttemptStatus,
-            eventTimeMillis);
-      }
-      // Otherwise leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus & no need to do anything
+      long eventTimeMillis, boolean isReminderEvent, boolean 
adoptConsensusFlowExecutionId) throws IOException {
+    LeaseAttemptStatus leaseAttempt = 
this.multiActiveLeaseArbiter.tryAcquireLease(
+        dagAction, eventTimeMillis, isReminderEvent, 
adoptConsensusFlowExecutionId);
+    if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus
+        && persistDagAction((LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttempt)) {
+      log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", 
leaseAttempt.getConsensusDagAction(),
+          ((LeaseAttemptStatus.LeaseObtainedStatus) 
leaseAttempt).getEventTimeMillis());
+    } else { // when NOT successfully `persistDagAction`, set a reminder to 
re-attempt handling (unless leasing finished)
+      
calcLeasedToAnotherStatusForReminder(leaseAttempt).ifPresent(leasedToAnother ->
+          scheduleReminderForEvent(jobProps, leasedToAnother, 
eventTimeMillis));
+    }
   }
 
-  // Called after obtaining a lease to persist the dag action to {@link 
DagActionStore} and mark the lease as done
-  private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus 
leaseStatus) {
-    if (this.dagActionStore.isPresent()) {
-      try {
-        DagActionStore.DagAction dagAction = leaseStatus.getDagAction();
-        this.dagActionStore.get().addFlowDagAction(dagAction.getFlowGroup(), 
dagAction.getFlowName(), dagAction.getFlowExecutionId(), 
dagAction.getDagActionType());
-        // If the dag action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numFlowsSubmitted.mark();
-        return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+  /** @return {@link Optional} status for reminding, unless {@link 
LeaseAttemptStatus.NoLongerLeasingStatus} (hence nothing to do) */
+  private Optional<LeaseAttemptStatus.LeasedToAnotherStatus> 
calcLeasedToAnotherStatusForReminder(LeaseAttemptStatus leaseAttempt) {
+    if (leaseAttempt instanceof LeaseAttemptStatus.NoLongerLeasingStatus) { // 
all done: nothing to remind about
+      return Optional.empty();
+    } else if (leaseAttempt instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus) { // already have one: just return it
+      return Optional.of((LeaseAttemptStatus.LeasedToAnotherStatus) 
leaseAttempt);
+    } else if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus) 
{ // remind w/o delay to immediately re-attempt handling
+      return Optional.of(new 
LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(), 
0L));
     } else {
-      throw new RuntimeException("DagActionStore is " + 
(this.dagActionStore.isPresent() ? "" : "NOT") + " present.");
+      throw new RuntimeException("unexpected `LeaseAttemptStatus` derived 
type: '" + leaseAttempt.getClass().getName() + "' in '" + leaseAttempt + "'");
+    }
+  }
+
+  /**
+   * Called after obtaining a lease to both persist to the {@link 
DagActionStore} and
+   * {@link 
MultiActiveLeaseArbiter#recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus)}
+   */
+  private boolean persistDagAction(LeaseAttemptStatus.LeaseObtainedStatus 
leaseStatus) {
+    try {
+      this.dagActionStore.addDagAction(leaseStatus.getConsensusDagAction());
+      this.numFlowsSubmitted.mark();
+      // after successfully persisting, close the lease
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
   }
 
@@ -156,7 +156,7 @@ public class FlowLaunchHandler {
    */
   private void scheduleReminderForEvent(Properties jobProps, 
LeaseAttemptStatus.LeasedToAnotherStatus status,
       long triggerEventTimeMillis) {
-    DagActionStore.DagAction dagAction = status.getDagAction();
+    DagActionStore.DagAction dagAction = status.getConsensusDagAction();
     JobKey origJobKey = new 
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
         jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job 
group>>"));
     try {
@@ -196,7 +196,7 @@ public class FlowLaunchHandler {
     Trigger reminderTrigger = JobScheduler.createTriggerForJob(reminderJobKey, 
getJobPropertiesFromJobDetail(jobDetail),
         Optional.of(reminderSuffix));
     log.debug("Flow Launch Handler - [{}, eventTimestamp: {}] -  attempting to 
schedule reminder for event {} with "
-            + "reminderJobKey {} and reminderTriggerKey {}", 
status.getDagAction(), triggerEventTimeMillis,
+            + "reminderJobKey {} and reminderTriggerKey {}", 
status.getConsensusDagAction(), triggerEventTimeMillis,
         status.getEventTimeMillis(), reminderJobKey, reminderTrigger.getKey());
     this.schedulerService.getScheduler().scheduleJob(jobDetail, 
reminderTrigger);
     return reminderTrigger;
@@ -258,7 +258,7 @@ public class FlowLaunchHandler {
     // Saves the following properties in jobProps to retrieve when the trigger 
fires
     
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
         String.valueOf(getUTCTimeFromDelayPeriod(delayPeriodMillis)));
-    // Use the db laundered timestamp for the reminder to ensure consensus 
between hosts. Participant trigger timestamps
+    // Use the db consensus timestamp for the reminder to ensure inter-host 
agreement. Participant trigger timestamps
     // can differ between participants and be interpreted as a reminder for a 
distinct flow trigger which will cause
     // excess flows to be triggered by the reminder functionality.
     
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
index 65526cb21..f77427892 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
@@ -25,13 +25,18 @@ import lombok.Getter;
 
 
 /**
- Class used to encapsulate status of lease acquisition attempts made by {@link 
MultiActiveLeaseArbiter} and contains
- information specific to the status that results. The {@link 
LeaseAttemptStatus#getDagAction} and
- {@link LeaseAttemptStatus#getMinimumLingerDurationMillis} are meant to be
- overridden and used by relevant derived classes.
+ * Hierarchy to convey the specific outcome of attempted lease acquisition via 
the {@link MultiActiveLeaseArbiter},
+ * with each derived type carrying outcome-specific status info.
+ *
+ * IMPL. NOTE: {@link LeaseAttemptStatus#getConsensusDagAction} and {@link 
LeaseAttemptStatus#getMinimumLingerDurationMillis}
+ * intended for `@Override`.
  */
 public abstract class LeaseAttemptStatus {
-  public DagActionStore.DagAction getDagAction() {
+  /**
+   * @return the {@link DagActionStore.DagAction}, which may now have an 
updated flowExecutionId that MUST henceforth be
+   * used; {@see MultiActiveLeaseArbiter#tryAcquireLease}
+   */
+  public DagActionStore.DagAction getConsensusDagAction() {
     return null;
   }
 
@@ -53,7 +58,7 @@ public abstract class LeaseAttemptStatus {
   */
   @Data
   public static class LeaseObtainedStatus extends LeaseAttemptStatus {
-    private final DagActionStore.DagAction dagAction;
+    private final DagActionStore.DagAction consensusDagAction;
     private final long leaseAcquisitionTimestamp;
     private final long minimumLingerDurationMillis;
     @Getter(AccessLevel.NONE)
@@ -63,7 +68,7 @@ public abstract class LeaseAttemptStatus {
      * @return event time in millis since epoch for the event of this lease 
acquisition
      */
     public long getEventTimeMillis() {
-      return Long.parseLong(dagAction.getFlowExecutionId());
+      return Long.parseLong(consensusDagAction.getFlowExecutionId());
     }
 
     /**
@@ -85,7 +90,7 @@ public abstract class LeaseAttemptStatus {
    */
   @Data
   public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
-    private final DagActionStore.DagAction dagAction;
+    private final DagActionStore.DagAction consensusDagAction;
     private final long minimumLingerDurationMillis;
 
     /**
@@ -93,7 +98,7 @@ public abstract class LeaseAttemptStatus {
      * @return
      */
     public long getEventTimeMillis() {
-      return Long.parseLong(dagAction.getFlowExecutionId());
+      return Long.parseLong(consensusDagAction.getFlowExecutionId());
     }
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
index 3fb5acb95..ba9c17c75 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java
@@ -52,11 +52,11 @@ public interface MultiActiveLeaseArbiter {
    * @param eventTimeMillis is the time this dag action was triggered
    * @param isReminderEvent true if the dag action event we're checking on is 
a reminder event
    * @param adoptConsensusFlowExecutionId if true then replaces the dagAction 
flowExecutionId returned in
-   *                                      LeaseAttemptStatuses with the 
consensus eventTime
+   *                                      LeaseAttemptStatuses with the 
consensus eventTime, accessed via
+   *                                      {@link 
LeaseAttemptStatus#getConsensusDagAction()}
    *
-   * @return LeaseAttemptStatus, containing a dag action that will have an 
updated flow execution id if `
-   * adoptConsensusFlowExecutionId` is true. The caller should use the newer 
version of the dag action to easily track
-   * the action moving forward.
+   * @return {@link LeaseAttemptStatus}, containing, when 
`adoptConsensusFlowExecutionId`, a universally-agreed-upon
+   * {@link DagActionStore.DagAction} with a possibly updated ("laundered") 
flow execution id that MUST be used thereafter
    * @throws IOException
    */
   LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction dagAction, long 
eventTimeMillis, boolean isReminderEvent,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
index f0b93e059..1141c0e9c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStore.java
@@ -144,12 +144,6 @@ public class MysqlDagActionStore implements DagActionStore 
{
     }}, true);
   }
 
-  @Override
-  public void addFlowDagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionType dagActionType)
-      throws IOException {
-    addJobDagAction(flowGroup, flowName, flowExecutionId, NO_JOB_NAME_DEFAULT, 
dagActionType);
-  }
-
   @Override
   public boolean deleteDagAction(DagAction dagAction) throws IOException {
     return 
dbStatementExecutor.withPreparedStatement(String.format(DELETE_STATEMENT, 
tableName), deleteStatement -> {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 113027b24..4479358ec 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -585,7 +585,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   @Override
   public boolean recordLeaseSuccess(LeaseAttemptStatus.LeaseObtainedStatus 
status)
       throws IOException {
-    DagActionStore.DagAction dagAction = status.getDagAction();
+    DagActionStore.DagAction dagAction = status.getConsensusDagAction();
     return 
dbStatementExecutor.withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT,
 leaseArbiterTableName),
         updateStatement -> {
           int i = 0;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 7466fb09b..86f1d12a9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -102,14 +102,14 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
   private UserQuotaManager quotaManager;
   private final FlowCompilationValidationHelper 
flowCompilationValidationHelper;
-  private Optional<FlowLaunchHandler> flowTriggerDecorator;
+  private Optional<FlowLaunchHandler> flowLaunchHandler;
   private Optional<FlowCatalog> flowCatalog;
   @Getter
   private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
 
   @Inject
   public Orchestrator(Config config, TopologyCatalog topologyCatalog, 
DagManager dagManager,
-      Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, 
Optional<FlowLaunchHandler> flowTriggerDecorator,
+      Optional<Logger> log, FlowStatusGenerator flowStatusGenerator, 
Optional<FlowLaunchHandler> flowLaunchHandler,
       SharedFlowMetricsSingleton sharedFlowMetricsSingleton, 
Optional<FlowCatalog> flowCatalog,
       Optional<DagManagementStateStore> dagManagementStateStore,
       FlowCompilationValidationHelper flowCompilationValidationHelper) throws 
IOException {
@@ -117,7 +117,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     this.topologyCatalog = topologyCatalog;
     this.dagManager = dagManager;
     this.flowStatusGenerator = flowStatusGenerator;
-    this.flowTriggerDecorator = flowTriggerDecorator;
+    this.flowLaunchHandler = flowLaunchHandler;
     this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
     this.flowCatalog = flowCatalog;
     this.flowCompilationValidationHelper = flowCompilationValidationHelper;
@@ -213,24 +213,24 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
       sharedFlowMetricsSingleton.addFlowGauge(spec, flowConfig, flowGroup, 
flowName);
 
-      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
-      String flowExecutionId = 
String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec));
-
-
-      DagActionStore.DagAction launchDagAction =
-          new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.NO_JOB_NAME_DEFAULT, DagActionStore.DagActionType.LAUNCH);
-
-      // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
-      // Skip flow compilation as well, since we recompile after receiving 
event from DagActionStoreChangeMonitor later
-      if (flowTriggerDecorator.isPresent()) {
-
-        // Adopt consensus flowExecutionId for scheduled flows
-        flowTriggerDecorator.get().handleFlowLaunchTriggerEvent(jobProps, 
launchDagAction, triggerTimestampMillis, isReminderEvent,
+      // only compile and pass directly to `DagManager` when multi-active NOT 
enabled; otherwise recompilation to occur later,
+      // once `DagActionStoreChangeMonitor` subsequently delegates this 
`DagActionType.LAUNCH`
+      if (flowLaunchHandler.isPresent()) {
+        DagActionStore.DagAction launchDagAction = 
DagActionStore.DagAction.forFlow(
+            flowGroup,
+            flowName,
+            String.valueOf(FlowUtils.getOrCreateFlowExecutionId(flowSpec)),
+            DagActionStore.DagActionType.LAUNCH
+        );
+
+        // `flowSpec.isScheduled()` ==> adopt consensus `flowExecutionId` as 
clock drift safeguard, yet w/o disrupting API-layer's ad hoc ID assignment
+        flowLaunchHandler.get().handleFlowLaunchTriggerEvent(jobProps, 
launchDagAction, triggerTimestampMillis, isReminderEvent,
             flowSpec.isScheduled());
         _log.info("Multi-active scheduler finished handling trigger event: 
[{}, is: {}, triggerEventTimestamp: {}]",
             launchDagAction, isReminderEvent ? "reminder" : "original", 
triggerTimestampMillis);
       } else {
         TimingEvent flowCompilationTimer = new 
TimingEvent(this.eventSubmitter, TimingEvent.FlowTimings.FLOW_COMPILED);
+        Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
         Optional<Dag<JobExecutionPlan>> compiledDagOptional =
             
this.flowCompilationValidationHelper.validateAndHandleConcurrentExecution(flowConfig,
 flowSpec, flowGroup,
                 flowName, flowMetadata);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 603db5e19..48795176e 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -103,7 +103,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
         (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
     Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
         firstObtainedStatus.getLeaseAcquisitionTimestamp());
-    Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
+    Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
         new DagActionStore.DagAction(flowGroup, flowName, 
String.valueOf(firstObtainedStatus.getEventTimeMillis()),
             jobName, DagActionStore.DagActionType.LAUNCH)));
 
@@ -340,7 +340,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
     Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <= 
firstObtainedStatus.getLeaseAcquisitionTimestamp());
-    Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
+    Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
         new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH)));
 
     // A second attempt to obtain a lease on the same action should return a 
LeasedToAnotherStatus which also contains
@@ -351,7 +351,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
         (LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
     Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), 
secondLeasedToAnotherStatus.getEventTimeMillis());
-    Assert.assertTrue(firstObtainedStatus.getDagAction().equals(
+    Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
         new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH)));
   }
 

Reply via email to