phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1227693255


##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -96,19 +96,19 @@ public class ConfigurationKeys {
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
   // Scheduler lease determination store configuration
-  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
"multi.active.scheduler.constants.db.table";
-  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "multi.active.scheduler.";
-  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= "scheduler.lease.determination.store.db.table";
-  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"gobblin_scheduler_lease_determination_store";
-  public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY = 
"reminderEventTimestampMillis";
-  public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY = 
"newEventTimestampMillis";
-  public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
+  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
"MysqlMultiActiveLeaseArbiter.constantsTable";
+  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "MysqlMultiActiveLeaseArbiter.gobblin_multi_active_scheduler_constants_store";
+  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= "MysqlMultiActiveLeaseArbiter.schedulerLeaseArbiterTable";

Review Comment:
   no biggie, but for the lot of these, I suggest separately defining 
`MYSQL_LEASE_ARBITER_PREFIX` and then prepending that to each of the many keys



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -96,19 +96,19 @@ public class ConfigurationKeys {
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
   // Scheduler lease determination store configuration
-  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
"multi.active.scheduler.constants.db.table";
-  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "multi.active.scheduler.";
-  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= "scheduler.lease.determination.store.db.table";
-  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"gobblin_scheduler_lease_determination_store";
-  public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY = 
"reminderEventTimestampMillis";
-  public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY = 
"newEventTimestampMillis";
-  public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
+  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
"MysqlMultiActiveLeaseArbiter.constantsTable";
+  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "MysqlMultiActiveLeaseArbiter.gobblin_multi_active_scheduler_constants_store";
+  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= "MysqlMultiActiveLeaseArbiter.schedulerLeaseArbiterTable";
+  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"MysqlMultiActiveLeaseArbiter.gobblin_scheduler_lease_determination_store";
+  public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY = 
"eventToRevisitTimestampMillis";
+  public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY = 
"triggerEventTimestampMillis";
+  public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = 
"MysqlMultiActiveLeaseArbiter.epsilonMillis";
   public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 100;

Review Comment:
   of course we'll tune this as we proceed.... still, on considering potential 
causes of late triggers, such as full GC pause, I'd imagine the need for a 
larger value.  I'd probably start w/ 5s



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -19,38 +19,38 @@
 
 import java.io.IOException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Data;
+
 
 /**
  * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
- * more active instances compete over ownership of a particular flow's event. 
The type of flow event in question does
- * not impact the algorithm other than to uniquely identify the flow event. 
Each instance uses the interface to initiate
- * an attempt at ownership over the flow event and receives a response 
indicating the status of the attempt.
+ * more active participants compete to take responsiblity for a particular 
flow's event. The type of flow event in
+ * question does not impact the algorithm other than to uniquely identify the 
flow event. Each participant uses the
+ * interface to initiate an attempt at ownership over the flow event and 
receives a response indicating the status of
+ * the attempt.
  *
  * At a high level the lease arbiter works as follows:
- *  1. Multiple instances receive knowledge of a flow action event to act upon
- *  2. Each instance attempts to acquire rights or `a lease` to be the sole 
instance acting on the event by calling the
- *      tryAcquireLease method below and receives the resulting status. The 
status indicates whether this instance has
- *        a) acquired the lease -> then this instance will attempt to complete 
the lease
- *        b) another has acquired the lease -> then another will attempt to 
complete the lease
- *        c) flow event no longer needs to be acted upon -> terminal state
- *  3. If another has acquired the lease, then the instance will check back in 
at the time of lease expiry to see if it
- *    needs to attempt the lease again [status (b) above].
- *  4. Once the instance which acquired the lease completes its work on the 
flow event, it calls completeLeaseUse to
- *    indicate to all other instances that the flow event no longer needs to 
be acted upon [status (c) above]
+ *  1. Multiple participants independently learn of a flow action event to act 
upon
+ *  2. Each participant attempts to acquire rights or `a lease` to be the sole 
participant acting on the event by
+ *     calling the tryAcquireLease method below and receives the resulting 
status. The status indicates whether this
+ *     participant has
+ *        a) LeaseObtainedStatus -> this participant will attempt to carry out 
the required action before the lease expires
+ *        b) LeasedToAnotherStatus -> another will attempt to carry out the 
required action before the lease expires
+ *        c) NoLongerLeasingStatus -> flow event no longer needs to be acted 
upon or terminal state

Review Comment:
   nit: "...acted upon (terminal state)"



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -62,7 +62,38 @@ public interface MultiActiveLeaseArbiter {
    * leaseAcquisitionTimeMillis values have not changed since this owner 
acquired the lease (indicating the lease did
    * not expire).
    * @return true if successfully updated, indicating no further actions need 
to be taken regarding this event.
+   *         false if failed to update the lease properly, the caller should 
continue seeking to acquire the lease as
+   *         if any actions it did successfully accomplish, do not count
    */
-  boolean completeLeaseUse(DagActionStore.DagAction flowAction, long 
eventTimeMillis, long leaseAcquisitionTimeMillis)
-      throws IOException;
+  boolean recordLeaseSuccess(DagActionStore.DagAction flowAction, 
LeaseObtainedStatus status) throws IOException;
+
+  /*
+   Object used to encapsulate status of lease acquisition attempt and derived 
should contain information specific to

Review Comment:
   nits:
   this (and all those below) should really be class javadoc to show up in that 
tool
   a class (not an object)
   "derived [classes/types]" OR "derivations"



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -62,7 +62,38 @@ public interface MultiActiveLeaseArbiter {
    * leaseAcquisitionTimeMillis values have not changed since this owner 
acquired the lease (indicating the lease did
    * not expire).
    * @return true if successfully updated, indicating no further actions need 
to be taken regarding this event.
+   *         false if failed to update the lease properly, the caller should 
continue seeking to acquire the lease as
+   *         if any actions it did successfully accomplish, do not count
    */
-  boolean completeLeaseUse(DagActionStore.DagAction flowAction, long 
eventTimeMillis, long leaseAcquisitionTimeMillis)
-      throws IOException;
+  boolean recordLeaseSuccess(DagActionStore.DagAction flowAction, 
LeaseObtainedStatus status) throws IOException;

Review Comment:
   nit: to streamline this interface, I'd personally tunnel the `flowAction` as 
a package-protected member of `LeaseObtainedStatus`.  thus, the core 
identification of what to record can be derived from that single param.
   
   in a way this is also more robust, since one `status` could never be 
mismatched w/ an unrelated `flowAction`.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -35,17 +36,12 @@ enum FlowActionType {
   }
 
   @Data
+  @AllArgsConstructor
   class DagAction {
     String flowGroup;
     String flowName;
     String flowExecutionId;
     FlowActionType flowActionType;

Review Comment:
   first off, this is beautiful... nearly looks like we're writing scala! ;p
   
   secondly, in what situations are we expecting the need to mutate the fields 
(i.e. why can't they be `final`?)?  in general `@Data` / POJOs work quite well 
being completely immutable.  (which is why such classes may not require 
`@AllArgsConstructor` in addition to `@Data`.)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -62,7 +62,38 @@ public interface MultiActiveLeaseArbiter {
    * leaseAcquisitionTimeMillis values have not changed since this owner 
acquired the lease (indicating the lease did
    * not expire).
    * @return true if successfully updated, indicating no further actions need 
to be taken regarding this event.
+   *         false if failed to update the lease properly, the caller should 
continue seeking to acquire the lease as
+   *         if any actions it did successfully accomplish, do not count
    */
-  boolean completeLeaseUse(DagActionStore.DagAction flowAction, long 
eventTimeMillis, long leaseAcquisitionTimeMillis)
-      throws IOException;
+  boolean recordLeaseSuccess(DagActionStore.DagAction flowAction, 
LeaseObtainedStatus status) throws IOException;
+
+  /*
+   Object used to encapsulate status of lease acquisition attempt and derived 
should contain information specific to
+   the status that results.
+   */
+  abstract class LeaseAttemptStatus {}
+
+  class NoLongerLeasingStatus extends LeaseAttemptStatus {}
+
+  /*
+  The participant calling this method acquired the lease for the event in 
question. The class contains the `eventTimestamp`
+  associated with the lease as well as the time the caller obtained the lease 
or `leaseAcquisitionTimestamp`.
+  */
+  @Data
+  class LeaseObtainedStatus extends LeaseAttemptStatus {
+    private final long eventTimestamp;
+    private final long leaseAcquisitionTimestamp;
+  }
+
+  /*
+  This flow action event already has a valid lease owned by another host.
+   */
+  @Data
+  class LeasedToAnotherStatus extends LeaseAttemptStatus {
+    // the timestamp the lease is associated with, but it may be a different 
timestamp for the same flow action
+    // (a previous participant of the event)
+    private final long eventTimeMillis;
+    // the minimum amount of time to wait before returning to check if the 
lease has completed or expired
+    private final long minimumLingerDurationMillis;

Review Comment:
   better to describe these in the class doc to cause them to show up in 
javadoc HTML and IDE help, etc.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -30,62 +29,60 @@
 import org.quartz.JobKey;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.typesafe.config.Config;
 
 import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
 import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
-import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
 
 
 /**
  * Handler used to coordinate multiple hosts with enabled schedulers to 
respond to flow action events. It uses the
- * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to 
determine a single lease owner at a given time
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a 
given time
  * for a flow action event. After acquiring the lease, it persists the flow 
action event to the {@link DagActionStore}
  * to be eventually acted upon by the host with the active DagManager. Once it 
has completed this action, it will mark
  * the lease as completed by calling the
- * {@link 
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse} 
method. Hosts that do not gain
+ * MysqlMultiActiveLeaseArbiter.recordLeaseSuccess method. Hosts that do not 
gain
  * the lease for the event, instead schedule a reminder using the {@link 
SchedulerService} to check back in on the
  * previous lease owner's completion status after the lease should expire to 
ensure the event is handled in failure
  * cases.
  */
-public class SchedulerLeaseAlgoHandler {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
-  private final int staggerUpperBoundSec;
+@Slf4j
+public class FlowTriggerHandler {
+  private final int schedulerMaxBackoffMillis;
   private static Random random = new Random();
   protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
   protected JobScheduler jobScheduler;
   protected SchedulerService schedulerService;
   protected DagActionStore dagActionStore;
   private MetricContext metricContext;
   private ContextAwareMeter numLeasesCompleted;
+
   @Inject
-  public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter 
leaseDeterminationStore,
+  // TODO: should multiActiveLeaseArbiter and DagActionStore be optional?

Review Comment:
   I wouldn't think so... but do elaborate on your thought process



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -30,62 +29,60 @@
 import org.quartz.JobKey;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.typesafe.config.Config;
 
 import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
 import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
-import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
 
 
 /**
  * Handler used to coordinate multiple hosts with enabled schedulers to 
respond to flow action events. It uses the
- * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to 
determine a single lease owner at a given time
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a 
given time
  * for a flow action event. After acquiring the lease, it persists the flow 
action event to the {@link DagActionStore}
  * to be eventually acted upon by the host with the active DagManager. Once it 
has completed this action, it will mark
  * the lease as completed by calling the
- * {@link 
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse} 
method. Hosts that do not gain
+ * MysqlMultiActiveLeaseArbiter.recordLeaseSuccess method. Hosts that do not 
gain
  * the lease for the event, instead schedule a reminder using the {@link 
SchedulerService} to check back in on the
  * previous lease owner's completion status after the lease should expire to 
ensure the event is handled in failure
  * cases.
  */
-public class SchedulerLeaseAlgoHandler {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
-  private final int staggerUpperBoundSec;
+@Slf4j
+public class FlowTriggerHandler {
+  private final int schedulerMaxBackoffMillis;
   private static Random random = new Random();
   protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
   protected JobScheduler jobScheduler;
   protected SchedulerService schedulerService;
   protected DagActionStore dagActionStore;
   private MetricContext metricContext;
   private ContextAwareMeter numLeasesCompleted;
+
   @Inject
-  public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter 
leaseDeterminationStore,
+  // TODO: should multiActiveLeaseArbiter and DagActionStore be optional?
+  public FlowTriggerHandler(Config config, MultiActiveLeaseArbiter 
leaseDeterminationStore,
       JobScheduler jobScheduler, SchedulerService schedulerService, 
DagActionStore dagActionStore) {
-    this.staggerUpperBoundSec = ConfigUtils.getInt(config,
-        ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
-        ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+    this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
+        ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
     this.multiActiveLeaseArbiter = leaseDeterminationStore;
     this.jobScheduler = jobScheduler;
     this.schedulerService = schedulerService;
     this.dagActionStore = dagActionStore;
     this.metricContext = Instrumented.getMetricContext(new 
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
         this.getClass());
-    this.numLeasesCompleted = 
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED);
+    this.numLeasesCompleted = 
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_FLOWS_SUBMITTED);

Review Comment:
   looks like the constant needs renaming to 
`GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_SUBMITTED`
   
   (shall we update `numLeasesCompleted` as well?)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -30,62 +29,60 @@
 import org.quartz.JobKey;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.typesafe.config.Config;
 
 import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
 import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
-import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
 
 
 /**
  * Handler used to coordinate multiple hosts with enabled schedulers to 
respond to flow action events. It uses the
- * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to 
determine a single lease owner at a given time
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a 
given time
  * for a flow action event. After acquiring the lease, it persists the flow 
action event to the {@link DagActionStore}
  * to be eventually acted upon by the host with the active DagManager. Once it 
has completed this action, it will mark
  * the lease as completed by calling the
- * {@link 
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse} 
method. Hosts that do not gain
+ * MysqlMultiActiveLeaseArbiter.recordLeaseSuccess method. Hosts that do not 
gain

Review Comment:
   I prefer the javadoc `@link` you had previously



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());

Review Comment:
   since this represents a gaping hole in our impl, it's actually more 
appropriate to scream/panic/freak out w/ a `RuntimeException`, than it is to 
presume to continue (as if completely shirking all responsibility for the 
`flowAction`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
-  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
     try {
       this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
           flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
-        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numLeasesCompleted.mark();
-        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
-            status.getMyLeaseAcquisitionTimestamp());
-      }
-    } catch (IOException | SQLException e) {
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numLeasesCompleted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction, 
status);
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
-    return false;
   }
 
   /**
-   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
-   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
-   * expire.
+   * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to 
schedule a reminder for itself to check on
+   * the other participant's progress to finish acting on a flow action after 
the time the lease should expire.
    * @param jobProps
    * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
    * @param originalEventTimeMillis the event timestamp we were originally 
handling
    * @param flowAction
    */
-  private void scheduleReminderForEvent(Properties jobProps, 
LeasedToAnotherStatus status,
+  private void scheduleReminderForEvent(Properties jobProps, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
       DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
-    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() + 
random.nextInt(staggerUpperBoundSec));
+    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+        + random.nextInt(schedulerMaxBackoffMillis));
     jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
-    // Ensure we save the event timestamp that we're setting reminder for, in 
addition to our own event timestamp which may be different
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
+    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
+    // in addition to the event we want to initiate
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(status.getEventTimeMillis()));
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(originalEventTimeMillis));
     JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
-    Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
+    Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
     try {
-      LOG.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -  
attempting to schedule reminder for event %s in %s millis",
-          flowAction, originalEventTimeMillis, 
status.getReminderEventTimeMillis(), trigger.getNextFireTime());
+      log.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -  
attempting to schedule reminder for event %s in %s millis",

Review Comment:
   update to "Flow Trigger Handler -'



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
-  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
     try {
       this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
           flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
-        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numLeasesCompleted.mark();
-        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
-            status.getMyLeaseAcquisitionTimestamp());
-      }
-    } catch (IOException | SQLException e) {
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numLeasesCompleted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction, 
status);
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
-    return false;
   }
 
   /**
-   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
-   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
-   * expire.
+   * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to 
schedule a reminder for itself to check on
+   * the other participant's progress to finish acting on a flow action after 
the time the lease should expire.
    * @param jobProps
    * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
    * @param originalEventTimeMillis the event timestamp we were originally 
handling
    * @param flowAction
    */
-  private void scheduleReminderForEvent(Properties jobProps, 
LeasedToAnotherStatus status,
+  private void scheduleReminderForEvent(Properties jobProps, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
       DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
-    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() + 
random.nextInt(staggerUpperBoundSec));
+    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+        + random.nextInt(schedulerMaxBackoffMillis));
     jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
-    // Ensure we save the event timestamp that we're setting reminder for, in 
addition to our own event timestamp which may be different
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
+    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
+    // in addition to the event we want to initiate
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(status.getEventTimeMillis()));
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(originalEventTimeMillis));
     JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
-    Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
+    Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
     try {
-      LOG.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -  
attempting to schedule reminder for event %s in %s millis",
-          flowAction, originalEventTimeMillis, 
status.getReminderEventTimeMillis(), trigger.getNextFireTime());
+      log.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -  
attempting to schedule reminder for event %s in %s millis",
+          flowAction, originalEventTimeMillis, status.getEventTimeMillis(), 
trigger.getNextFireTime());
       this.schedulerService.getScheduler().scheduleJob(trigger);
     } catch (SchedulerException e) {
-      LOG.warn("Failed to add job reminder due to SchedulerException for job 
%s trigger event %s ", key, status.getReminderEventTimeMillis(), e);
+      log.warn("Failed to add job reminder due to SchedulerException for job 
%s trigger event %s ", key, status.getEventTimeMillis(), e);
     }
-    LOG.info(String.format("Scheduler Lease Algo Handler - [%s, 
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
-        flowAction, originalEventTimeMillis, 
status.getReminderEventTimeMillis(), trigger.getNextFireTime()));
+    log.info(String.format("Scheduler Lease Algo Handler - [%s, 
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",

Review Comment:
   (also here)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
-  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
     try {
       this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
           flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
-        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numLeasesCompleted.mark();
-        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
-            status.getMyLeaseAcquisitionTimestamp());
-      }
-    } catch (IOException | SQLException e) {
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numLeasesCompleted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction, 
status);
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
-    return false;
   }
 
   /**
-   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
-   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
-   * expire.
+   * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to 
schedule a reminder for itself to check on
+   * the other participant's progress to finish acting on a flow action after 
the time the lease should expire.
    * @param jobProps
    * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
    * @param originalEventTimeMillis the event timestamp we were originally 
handling
    * @param flowAction
    */
-  private void scheduleReminderForEvent(Properties jobProps, 
LeasedToAnotherStatus status,
+  private void scheduleReminderForEvent(Properties jobProps, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,

Review Comment:
   it may be necessary to overload this w/ a form taking 
`LeaseObtainedStatus`...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -55,31 +55,27 @@ public void 
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
     try {
       // If an existing resume request is still pending then do not accept 
this request
       if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
-        this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME,
-            new RuntimeException("There is already a pending RESUME action for 
this flow. Please wait to resubmit and wait for"
-                + " action to be completed."));
+        this.handleException("There is already a pending RESUME action for 
this flow. Please wait to resubmit and wait "

Review Comment:
   nit: `handleError` / `prepareError` (?)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -166,8 +165,8 @@ public 
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
       Config config,
       Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, 
Optional<TopologyCatalog> topologyCatalog,
       Orchestrator orchestrator, SchedulerService schedulerService, 
Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
-      @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled, 
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
multiActiveSchedulerEnabled,
-      SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler) throws Exception {
+      @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
+      Optional<FlowTriggerHandler> schedulerLeaseAlgoHandler) throws Exception 
{

Review Comment:
   rename `schedulerLeaseAlgoHandler`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -55,31 +55,27 @@ public void 
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
     try {
       // If an existing resume request is still pending then do not accept 
this request
       if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
-        this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME,
-            new RuntimeException("There is already a pending RESUME action for 
this flow. Please wait to resubmit and wait for"
-                + " action to be completed."));
+        this.handleException("There is already a pending RESUME action for 
this flow. Please wait to resubmit and wait "
+            + "for action to be completed.", HttpStatus.S_409_CONFLICT);
         return;
       }
       this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
     } catch (IOException | SQLException e) {
       log.warn(
           String.format("Failed to add execution resume action for flow %s %s 
%s to dag action store due to", flowGroup,
               flowName, flowExecutionId), e);
-      this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME, e);
+      this.handleException(e.getMessage(), 
HttpStatus.S_500_INTERNAL_SERVER_ERROR);
     }
 
   }
 
-  private void handleException (String flowGroup, String flowName, String 
flowExecutionId, DagActionStore.FlowActionType flowActionType, Exception e) {
-    try {
-      if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, 
flowActionType)) {
-        throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, 
e.getMessage());
-      } else {
-        throw new 
RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
-      }
-    } catch (IOException | SQLException ex) {
-      throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, 
e.getMessage());
+  private void handleException(String exceptionMessage, HttpStatus errorType) {
+    if (errorType == HttpStatus.S_409_CONFLICT) {
+      throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, 
exceptionMessage);
+    } else if (errorType == HttpStatus.S_400_BAD_REQUEST) {
+      new UpdateResponse(HttpStatus.S_400_BAD_REQUEST);

Review Comment:
   the other two throw... should this not?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -66,24 +67,37 @@
  * host should actually complete its work while having the lease and then mark 
the flow action as NULL to indicate no
  * further leasing should be done for the event.
  */
-public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+@Slf4j
+public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
   /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
   @FunctionalInterface
   protected interface CheckedFunction<T, R> {
     R apply(T t) throws IOException, SQLException;
   }
 
-  public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+  public static final String CONFIG_PREFIX = "MysqlMultiActiveLeaseArbiter";

Review Comment:
   better to define the prefix in `ConfigurationKeys`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -199,23 +197,27 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction flowAction, l
       int leaseValidityStatus = resultSet.getInt(4);

Review Comment:
   hopefully doesn't feel like overkill, but I'd abstract this by defining a 
`static` inner `@Data` class with an overloaded constructor (or `static` 
factory method) taking a `ResultSet`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -100,8 +114,8 @@ public class MySQLMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   // Insert or update row to acquire lease if values have not changed since 
the previous read
   // Need to define three separate statements to handle cases where row does 
not exist or has null values to check
   protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
-      + "(flow_group, flow_name, flow_execution_id, flow_action, 
event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT "
-      + "EXISTS (SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_KEY + "); " + 
SELECT_AFTER_INSERT_STATEMENT;
+      + "(flow_group, flow_name, flow_execution_id, flow_action, 
event_timestamp) VALUES (?, ?, ?, ?, ?); "
+      + SELECT_AFTER_INSERT_STATEMENT;

Review Comment:
   just thinking... it may be clearer not to append this to the constant, but 
rather to catenate them at point of use



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -579,11 +579,10 @@ public void run() {
       }
     }
 
-    private void clearUpDagAction(DagId dagId) throws IOException {
+    private void clearUpDagAction(DagId dagId, DagActionStore.FlowActionType 
flowActionType) throws IOException {

Review Comment:
   nit: "clear up" seems indirect and w/ a wide variety of connotations (from 
misunderstandings and weather... to skin care).
   
   what are we doing here?  "resolving" the action?  "dismissing"?  "deleting"?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java:
##########
@@ -581,7 +581,7 @@ public void close() throws IOException {
   /**
    * Get a {@link org.quartz.Trigger} from the given job configuration 
properties.
    */
-  public Trigger getTrigger(JobKey jobKey, Properties jobProps) {
+  public Trigger createTriggerForJob(JobKey jobKey, Properties jobProps) {

Review Comment:
   couldn't this be `static`?  if so, let's!



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -579,11 +579,10 @@ public void run() {
       }
     }
 
-    private void clearUpDagAction(DagId dagId) throws IOException {
+    private void clearUpDagAction(DagId dagId, DagActionStore.FlowActionType 
flowActionType) throws IOException {
       if (this.dagActionStore.isPresent()) {
         this.dagActionStore.get().deleteDagAction(
-            new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName, 
dagId.flowExecutionId,
-                DagActionStore.FlowActionType.KILL));
+            new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName, 
dagId.flowExecutionId, flowActionType));

Review Comment:
   minor, but depending on how prevalent this is, it might be reasonable to 
supply an additional constructor:
   ```
   public DagAction(DagId dagId, FlowActionType flowActionType) { ... }
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -106,22 +104,20 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   private FlowStatusGenerator flowStatusGenerator;
 
   private UserQuotaManager quotaManager;
-  private boolean isMultiActiveSchedulerEnabled;
-  private SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler;
+  private Optional<FlowTriggerHandler> schedulerLeaseAlgoHandler;

Review Comment:
   rename `schedulerLeaseAlgoHandler`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -317,26 +312,27 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
       }
 
       // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
-      if (this.isMultiActiveSchedulerEnabled) {
+      if (schedulerLeaseAlgoHandler.isPresent()) {
+        // If triggerTimestampMillis is 0, then it was not set by the job 
trigger handler, and we cannot handle this event
+        if (triggerTimestampMillis == 0L) {
+          _log.warn("Skipping execution of spec: {} because missing trigger 
timestamp in job properties",
+              jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+          flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow orchestration 
skipped because no trigger timestamp "
+              + "associated with flow action.");
+          if (this.eventSubmitter.isPresent()) {
+            new TimingEvent(this.eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);

Review Comment:
   tip: `ifPresent()`
   
   (note: this works easily and naturally... unless checked exceptions)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);

Review Comment:
   shouldn't we confirm the return value to determine whether to schedule a 
reminder?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -378,6 +374,26 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  public void submitFlowToDagManager(FlowSpec flowSpec, 
Optional<Dag<JobExecutionPlan>> jobExecutionPlanDag)
+      throws IOException {
+    if (!jobExecutionPlanDag.isPresent()) {
+      jobExecutionPlanDag = Optional.of(specCompiler.compileFlow(flowSpec));
+    }

Review Comment:
   while `Optional` is generally great, it's only needed when those calling are 
unaware of whether or not they are holding on to a `Dag<>` or not.  when it is 
statically known by the caller, instead clearer is to overload 
`submitFlowToDagManager` to both a unary and binary form.
   
   here the unary form merely compiles the DAG before forwarding/delegating to 
the binary one:
   ```
   public void submitFlowToDagManager(FlowSpec flowSpec) throws IOE {
     submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
   }
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
-  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
     try {
       this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
           flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
-        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numLeasesCompleted.mark();
-        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
-            status.getMyLeaseAcquisitionTimestamp());
-      }
-    } catch (IOException | SQLException e) {
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numLeasesCompleted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction, 
status);
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
-    return false;
   }
 
   /**
-   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
-   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
-   * expire.
+   * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to 
schedule a reminder for itself to check on
+   * the other participant's progress to finish acting on a flow action after 
the time the lease should expire.
    * @param jobProps
    * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
    * @param originalEventTimeMillis the event timestamp we were originally 
handling
    * @param flowAction
    */
-  private void scheduleReminderForEvent(Properties jobProps, 
LeasedToAnotherStatus status,
+  private void scheduleReminderForEvent(Properties jobProps, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
       DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
-    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() + 
random.nextInt(staggerUpperBoundSec));
+    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+        + random.nextInt(schedulerMaxBackoffMillis));
     jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
-    // Ensure we save the event timestamp that we're setting reminder for, in 
addition to our own event timestamp which may be different
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
+    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
+    // in addition to the event we want to initiate
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(status.getEventTimeMillis()));
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(originalEventTimeMillis));
     JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
-    Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
+    Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);

Review Comment:
   would be ideal if this invocation could be `static`!



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
-  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
     try {
       this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
           flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
-        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numLeasesCompleted.mark();
-        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
-            status.getMyLeaseAcquisitionTimestamp());
-      }
-    } catch (IOException | SQLException e) {
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numLeasesCompleted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction, 
status);
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
-    return false;
   }
 
   /**
-   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
-   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
-   * expire.
+   * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to 
schedule a reminder for itself to check on

Review Comment:
   a. I like the `@link`
   b. "a reminder for itself" => "a self-reminder"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
-  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
     try {
       this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
           flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
-        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numLeasesCompleted.mark();
-        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
-            status.getMyLeaseAcquisitionTimestamp());
-      }
-    } catch (IOException | SQLException e) {
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numLeasesCompleted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction, 
status);
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
-    return false;
   }
 
   /**
-   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
-   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
-   * expire.
+   * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to 
schedule a reminder for itself to check on
+   * the other participant's progress to finish acting on a flow action after 
the time the lease should expire.
    * @param jobProps
    * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
    * @param originalEventTimeMillis the event timestamp we were originally 
handling
    * @param flowAction
    */
-  private void scheduleReminderForEvent(Properties jobProps, 
LeasedToAnotherStatus status,
+  private void scheduleReminderForEvent(Properties jobProps, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
       DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
-    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() + 
random.nextInt(staggerUpperBoundSec));
+    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+        + random.nextInt(schedulerMaxBackoffMillis));
     jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
-    // Ensure we save the event timestamp that we're setting reminder for, in 
addition to our own event timestamp which may be different
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
+    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
+    // in addition to the event we want to initiate
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(status.getEventTimeMillis()));
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(originalEventTimeMillis));
     JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
-    Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
+    Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
     try {
-      LOG.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -  
attempting to schedule reminder for event %s in %s millis",
-          flowAction, originalEventTimeMillis, 
status.getReminderEventTimeMillis(), trigger.getNextFireTime());
+      log.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -  
attempting to schedule reminder for event %s in %s millis",
+          flowAction, originalEventTimeMillis, status.getEventTimeMillis(), 
trigger.getNextFireTime());
       this.schedulerService.getScheduler().scheduleJob(trigger);
     } catch (SchedulerException e) {
-      LOG.warn("Failed to add job reminder due to SchedulerException for job 
%s trigger event %s ", key, status.getReminderEventTimeMillis(), e);
+      log.warn("Failed to add job reminder due to SchedulerException for job 
%s trigger event %s ", key, status.getEventTimeMillis(), e);
     }
-    LOG.info(String.format("Scheduler Lease Algo Handler - [%s, 
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
-        flowAction, originalEventTimeMillis, 
status.getReminderEventTimeMillis(), trigger.getNextFireTime()));
+    log.info(String.format("Scheduler Lease Algo Handler - [%s, 
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
+        flowAction, originalEventTimeMillis, status.getEventTimeMillis(), 
trigger.getNextFireTime()));
   }
 
   /**
    * These methods should only be called from the Orchestrator or JobScheduler 
classes as it directly adds jobs to the
    * Quartz scheduler
-   * @param delayPeriodSeconds
+   * @param delayPeriodMillis
    * @return
    */
-  protected static String createCronFromDelayPeriod(long delayPeriodSeconds) {
+  protected static String createCronFromDelayPeriod(long delayPeriodMillis) {
     LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
-    LocalDateTime delaySecondsLater = now.plus(delayPeriodSeconds, 
ChronoUnit.SECONDS);
+    LocalDateTime delaySecondsLater = now.plus(delayPeriodMillis, 
ChronoUnit.MILLIS);

Review Comment:
   nit: `delaySecondsLater` is neither seconds nor an interval, but an absolute 
time in millis



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -165,9 +161,8 @@ public Orchestrator(Config config, 
Optional<TopologyCatalog> topologyCatalog, Op
 
   @Inject
   public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, 
Optional<TopologyCatalog> topologyCatalog,
-      Optional<DagManager> dagManager, Optional<Logger> log, 
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
multiActiveSchedulerEnabled,
-      SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler) {
-    this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, 
multiActiveSchedulerEnabled, schedulerLeaseAlgoHandler);
+      Optional<DagManager> dagManager, Optional<Logger> log, 
Optional<FlowTriggerHandler> schedulerLeaseAlgoHandler) {

Review Comment:
   need to rename `schedulerLeaseAlgoHandler`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -66,24 +67,37 @@
  * host should actually complete its work while having the lease and then mark 
the flow action as NULL to indicate no
  * further leasing should be done for the event.
  */
-public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+@Slf4j
+public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
   /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
   @FunctionalInterface
   protected interface CheckedFunction<T, R> {
     R apply(T t) throws IOException, SQLException;
   }
 
-  public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+  public static final String CONFIG_PREFIX = "MysqlMultiActiveLeaseArbiter";
 
   protected final DataSource dataSource;
   private final String leaseArbiterTableName;
   private final String constantsTableName;
   private final int epsilon;
   private final int linger;
+
+  // TODO: define retention on this table
+  private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %S ("
+      + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") 
NOT NULL, flow_name varchar("
+      + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + 
"flow_execution_id varchar("
+      + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, 
flow_action varchar(100) NOT NULL, "
+      + "event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "

Review Comment:
   suggest no default since probably shouldn't be possible to insert w/o a 
specific `event_timestamp`
   
   (the default for `lease_acq_tstamp` seems reasonable)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to