[ 
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=865141&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-865141
 ]

ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Jun/23 09:46
            Start Date: 13/Jun/23 09:46
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1227693255


##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -96,19 +96,19 @@ public class ConfigurationKeys {
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
   // Scheduler lease determination store configuration
-  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
"multi.active.scheduler.constants.db.table";
-  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "multi.active.scheduler.";
-  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= "scheduler.lease.determination.store.db.table";
-  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"gobblin_scheduler_lease_determination_store";
-  public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY = 
"reminderEventTimestampMillis";
-  public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY = 
"newEventTimestampMillis";
-  public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
+  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
"MysqlMultiActiveLeaseArbiter.constantsTable";
+  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "MysqlMultiActiveLeaseArbiter.gobblin_multi_active_scheduler_constants_store";
+  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= "MysqlMultiActiveLeaseArbiter.schedulerLeaseArbiterTable";

Review Comment:
   no biggie, but for the lot of these, I suggest separately defining 
`MYSQL_LEASE_ARBITER_PREFIX` and then prepending that to each of the many keys



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -96,19 +96,19 @@ public class ConfigurationKeys {
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
   // Scheduler lease determination store configuration
-  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
"multi.active.scheduler.constants.db.table";
-  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "multi.active.scheduler.";
-  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= "scheduler.lease.determination.store.db.table";
-  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"gobblin_scheduler_lease_determination_store";
-  public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY = 
"reminderEventTimestampMillis";
-  public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY = 
"newEventTimestampMillis";
-  public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
+  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
"MysqlMultiActiveLeaseArbiter.constantsTable";
+  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "MysqlMultiActiveLeaseArbiter.gobblin_multi_active_scheduler_constants_store";
+  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= "MysqlMultiActiveLeaseArbiter.schedulerLeaseArbiterTable";
+  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"MysqlMultiActiveLeaseArbiter.gobblin_scheduler_lease_determination_store";
+  public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY = 
"eventToRevisitTimestampMillis";
+  public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY = 
"triggerEventTimestampMillis";
+  public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = 
"MysqlMultiActiveLeaseArbiter.epsilonMillis";
   public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 100;

Review Comment:
   of course we'll tune this as we proceed.... still, on considering potential 
causes of late triggers, such as full GC pause, I'd imagine the need for a 
larger value.  I'd probably start w/ 5s



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -19,38 +19,38 @@
 
 import java.io.IOException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Data;
+
 
 /**
  * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
- * more active instances compete over ownership of a particular flow's event. 
The type of flow event in question does
- * not impact the algorithm other than to uniquely identify the flow event. 
Each instance uses the interface to initiate
- * an attempt at ownership over the flow event and receives a response 
indicating the status of the attempt.
+ * more active participants compete to take responsiblity for a particular 
flow's event. The type of flow event in
+ * question does not impact the algorithm other than to uniquely identify the 
flow event. Each participant uses the
+ * interface to initiate an attempt at ownership over the flow event and 
receives a response indicating the status of
+ * the attempt.
  *
  * At a high level the lease arbiter works as follows:
- *  1. Multiple instances receive knowledge of a flow action event to act upon
- *  2. Each instance attempts to acquire rights or `a lease` to be the sole 
instance acting on the event by calling the
- *      tryAcquireLease method below and receives the resulting status. The 
status indicates whether this instance has
- *        a) acquired the lease -> then this instance will attempt to complete 
the lease
- *        b) another has acquired the lease -> then another will attempt to 
complete the lease
- *        c) flow event no longer needs to be acted upon -> terminal state
- *  3. If another has acquired the lease, then the instance will check back in 
at the time of lease expiry to see if it
- *    needs to attempt the lease again [status (b) above].
- *  4. Once the instance which acquired the lease completes its work on the 
flow event, it calls completeLeaseUse to
- *    indicate to all other instances that the flow event no longer needs to 
be acted upon [status (c) above]
+ *  1. Multiple participants independently learn of a flow action event to act 
upon
+ *  2. Each participant attempts to acquire rights or `a lease` to be the sole 
participant acting on the event by
+ *     calling the tryAcquireLease method below and receives the resulting 
status. The status indicates whether this
+ *     participant has
+ *        a) LeaseObtainedStatus -> this participant will attempt to carry out 
the required action before the lease expires
+ *        b) LeasedToAnotherStatus -> another will attempt to carry out the 
required action before the lease expires
+ *        c) NoLongerLeasingStatus -> flow event no longer needs to be acted 
upon or terminal state

Review Comment:
   nit: "...acted upon (terminal state)"



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -62,7 +62,38 @@ public interface MultiActiveLeaseArbiter {
    * leaseAcquisitionTimeMillis values have not changed since this owner 
acquired the lease (indicating the lease did
    * not expire).
    * @return true if successfully updated, indicating no further actions need 
to be taken regarding this event.
+   *         false if failed to update the lease properly, the caller should 
continue seeking to acquire the lease as
+   *         if any actions it did successfully accomplish, do not count
    */
-  boolean completeLeaseUse(DagActionStore.DagAction flowAction, long 
eventTimeMillis, long leaseAcquisitionTimeMillis)
-      throws IOException;
+  boolean recordLeaseSuccess(DagActionStore.DagAction flowAction, 
LeaseObtainedStatus status) throws IOException;
+
+  /*
+   Object used to encapsulate status of lease acquisition attempt and derived 
should contain information specific to

Review Comment:
   nits:
   this (and all those below) should really be class javadoc to show up in that 
tool
   a class (not an object)
   "derived [classes/types]" OR "derivations"



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -62,7 +62,38 @@ public interface MultiActiveLeaseArbiter {
    * leaseAcquisitionTimeMillis values have not changed since this owner 
acquired the lease (indicating the lease did
    * not expire).
    * @return true if successfully updated, indicating no further actions need 
to be taken regarding this event.
+   *         false if failed to update the lease properly, the caller should 
continue seeking to acquire the lease as
+   *         if any actions it did successfully accomplish, do not count
    */
-  boolean completeLeaseUse(DagActionStore.DagAction flowAction, long 
eventTimeMillis, long leaseAcquisitionTimeMillis)
-      throws IOException;
+  boolean recordLeaseSuccess(DagActionStore.DagAction flowAction, 
LeaseObtainedStatus status) throws IOException;

Review Comment:
   nit: to streamline this interface, I'd personally tunnel the `flowAction` as 
a package-protected member of `LeaseObtainedStatus`.  thus, the core 
identification of what to record can be derived from that single param.
   
   in a way this is also more robust, since one `status` could never be 
mismatched w/ an unrelated `flowAction`.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -35,17 +36,12 @@ enum FlowActionType {
   }
 
   @Data
+  @AllArgsConstructor
   class DagAction {
     String flowGroup;
     String flowName;
     String flowExecutionId;
     FlowActionType flowActionType;

Review Comment:
   first off, this is beautiful... nearly looks like we're writing scala! ;p
   
   secondly, in what situations are we expecting the need to mutate the fields 
(i.e. why can't they be `final`?)?  in general `@Data` / POJOs work quite well 
being completely immutable.  (which is why such classes may not require 
`@AllArgsConstructor` in addition to `@Data`.)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -62,7 +62,38 @@ public interface MultiActiveLeaseArbiter {
    * leaseAcquisitionTimeMillis values have not changed since this owner 
acquired the lease (indicating the lease did
    * not expire).
    * @return true if successfully updated, indicating no further actions need 
to be taken regarding this event.
+   *         false if failed to update the lease properly, the caller should 
continue seeking to acquire the lease as
+   *         if any actions it did successfully accomplish, do not count
    */
-  boolean completeLeaseUse(DagActionStore.DagAction flowAction, long 
eventTimeMillis, long leaseAcquisitionTimeMillis)
-      throws IOException;
+  boolean recordLeaseSuccess(DagActionStore.DagAction flowAction, 
LeaseObtainedStatus status) throws IOException;
+
+  /*
+   Object used to encapsulate status of lease acquisition attempt and derived 
should contain information specific to
+   the status that results.
+   */
+  abstract class LeaseAttemptStatus {}
+
+  class NoLongerLeasingStatus extends LeaseAttemptStatus {}
+
+  /*
+  The participant calling this method acquired the lease for the event in 
question. The class contains the `eventTimestamp`
+  associated with the lease as well as the time the caller obtained the lease 
or `leaseAcquisitionTimestamp`.
+  */
+  @Data
+  class LeaseObtainedStatus extends LeaseAttemptStatus {
+    private final long eventTimestamp;
+    private final long leaseAcquisitionTimestamp;
+  }
+
+  /*
+  This flow action event already has a valid lease owned by another host.
+   */
+  @Data
+  class LeasedToAnotherStatus extends LeaseAttemptStatus {
+    // the timestamp the lease is associated with, but it may be a different 
timestamp for the same flow action
+    // (a previous participant of the event)
+    private final long eventTimeMillis;
+    // the minimum amount of time to wait before returning to check if the 
lease has completed or expired
+    private final long minimumLingerDurationMillis;

Review Comment:
   better to describe these in the class doc to cause them to show up in 
javadoc HTML and IDE help, etc.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -30,62 +29,60 @@
 import org.quartz.JobKey;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.typesafe.config.Config;
 
 import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
 import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
-import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
 
 
 /**
  * Handler used to coordinate multiple hosts with enabled schedulers to 
respond to flow action events. It uses the
- * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to 
determine a single lease owner at a given time
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a 
given time
  * for a flow action event. After acquiring the lease, it persists the flow 
action event to the {@link DagActionStore}
  * to be eventually acted upon by the host with the active DagManager. Once it 
has completed this action, it will mark
  * the lease as completed by calling the
- * {@link 
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse} 
method. Hosts that do not gain
+ * MysqlMultiActiveLeaseArbiter.recordLeaseSuccess method. Hosts that do not 
gain
  * the lease for the event, instead schedule a reminder using the {@link 
SchedulerService} to check back in on the
  * previous lease owner's completion status after the lease should expire to 
ensure the event is handled in failure
  * cases.
  */
-public class SchedulerLeaseAlgoHandler {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
-  private final int staggerUpperBoundSec;
+@Slf4j
+public class FlowTriggerHandler {
+  private final int schedulerMaxBackoffMillis;
   private static Random random = new Random();
   protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
   protected JobScheduler jobScheduler;
   protected SchedulerService schedulerService;
   protected DagActionStore dagActionStore;
   private MetricContext metricContext;
   private ContextAwareMeter numLeasesCompleted;
+
   @Inject
-  public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter 
leaseDeterminationStore,
+  // TODO: should multiActiveLeaseArbiter and DagActionStore be optional?

Review Comment:
   I wouldn't think so... but do elaborate on your thought process



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -30,62 +29,60 @@
 import org.quartz.JobKey;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.typesafe.config.Config;
 
 import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
 import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
-import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
 
 
 /**
  * Handler used to coordinate multiple hosts with enabled schedulers to 
respond to flow action events. It uses the
- * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to 
determine a single lease owner at a given time
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a 
given time
  * for a flow action event. After acquiring the lease, it persists the flow 
action event to the {@link DagActionStore}
  * to be eventually acted upon by the host with the active DagManager. Once it 
has completed this action, it will mark
  * the lease as completed by calling the
- * {@link 
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse} 
method. Hosts that do not gain
+ * MysqlMultiActiveLeaseArbiter.recordLeaseSuccess method. Hosts that do not 
gain
  * the lease for the event, instead schedule a reminder using the {@link 
SchedulerService} to check back in on the
  * previous lease owner's completion status after the lease should expire to 
ensure the event is handled in failure
  * cases.
  */
-public class SchedulerLeaseAlgoHandler {
-  private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
-  private final int staggerUpperBoundSec;
+@Slf4j
+public class FlowTriggerHandler {
+  private final int schedulerMaxBackoffMillis;
   private static Random random = new Random();
   protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
   protected JobScheduler jobScheduler;
   protected SchedulerService schedulerService;
   protected DagActionStore dagActionStore;
   private MetricContext metricContext;
   private ContextAwareMeter numLeasesCompleted;
+
   @Inject
-  public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter 
leaseDeterminationStore,
+  // TODO: should multiActiveLeaseArbiter and DagActionStore be optional?
+  public FlowTriggerHandler(Config config, MultiActiveLeaseArbiter 
leaseDeterminationStore,
       JobScheduler jobScheduler, SchedulerService schedulerService, 
DagActionStore dagActionStore) {
-    this.staggerUpperBoundSec = ConfigUtils.getInt(config,
-        ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
-        ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+    this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
+        ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
     this.multiActiveLeaseArbiter = leaseDeterminationStore;
     this.jobScheduler = jobScheduler;
     this.schedulerService = schedulerService;
     this.dagActionStore = dagActionStore;
     this.metricContext = Instrumented.getMetricContext(new 
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
         this.getClass());
-    this.numLeasesCompleted = 
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED);
+    this.numLeasesCompleted = 
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_FLOWS_SUBMITTED);

Review Comment:
   looks like the constant needs renaming to 
`GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_SUBMITTED`
   
   (shall we update `numLeasesCompleted` as well?)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -30,62 +29,60 @@
 import org.quartz.JobKey;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.typesafe.config.Config;
 
 import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
 import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
-import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
 
 
 /**
  * Handler used to coordinate multiple hosts with enabled schedulers to 
respond to flow action events. It uses the
- * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to 
determine a single lease owner at a given time
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a 
given time
  * for a flow action event. After acquiring the lease, it persists the flow 
action event to the {@link DagActionStore}
  * to be eventually acted upon by the host with the active DagManager. Once it 
has completed this action, it will mark
  * the lease as completed by calling the
- * {@link 
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse} 
method. Hosts that do not gain
+ * MysqlMultiActiveLeaseArbiter.recordLeaseSuccess method. Hosts that do not 
gain

Review Comment:
   I prefer the javadoc `@link` you had previously



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());

Review Comment:
   since this represents a gaping hole in our impl, it's actually more 
appropriate to scream/panic/freak out w/ a `RuntimeException`, than it is to 
presume to continue (as if completely shirking all responsibility for the 
`flowAction`)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
-  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
     try {
       this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
           flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
-        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numLeasesCompleted.mark();
-        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
-            status.getMyLeaseAcquisitionTimestamp());
-      }
-    } catch (IOException | SQLException e) {
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numLeasesCompleted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction, 
status);
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
-    return false;
   }
 
   /**
-   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
-   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
-   * expire.
+   * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to 
schedule a reminder for itself to check on
+   * the other participant's progress to finish acting on a flow action after 
the time the lease should expire.
    * @param jobProps
    * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
    * @param originalEventTimeMillis the event timestamp we were originally 
handling
    * @param flowAction
    */
-  private void scheduleReminderForEvent(Properties jobProps, 
LeasedToAnotherStatus status,
+  private void scheduleReminderForEvent(Properties jobProps, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
       DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
-    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() + 
random.nextInt(staggerUpperBoundSec));
+    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+        + random.nextInt(schedulerMaxBackoffMillis));
     jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
-    // Ensure we save the event timestamp that we're setting reminder for, in 
addition to our own event timestamp which may be different
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
+    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
+    // in addition to the event we want to initiate
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(status.getEventTimeMillis()));
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(originalEventTimeMillis));
     JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
-    Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
+    Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
     try {
-      LOG.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -  
attempting to schedule reminder for event %s in %s millis",
-          flowAction, originalEventTimeMillis, 
status.getReminderEventTimeMillis(), trigger.getNextFireTime());
+      log.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -  
attempting to schedule reminder for event %s in %s millis",

Review Comment:
   update to "Flow Trigger Handler -'



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
-  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
     try {
       this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
           flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
-        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numLeasesCompleted.mark();
-        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
-            status.getMyLeaseAcquisitionTimestamp());
-      }
-    } catch (IOException | SQLException e) {
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numLeasesCompleted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction, 
status);
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
-    return false;
   }
 
   /**
-   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
-   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
-   * expire.
+   * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to 
schedule a reminder for itself to check on
+   * the other participant's progress to finish acting on a flow action after 
the time the lease should expire.
    * @param jobProps
    * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
    * @param originalEventTimeMillis the event timestamp we were originally 
handling
    * @param flowAction
    */
-  private void scheduleReminderForEvent(Properties jobProps, 
LeasedToAnotherStatus status,
+  private void scheduleReminderForEvent(Properties jobProps, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
       DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
-    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() + 
random.nextInt(staggerUpperBoundSec));
+    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+        + random.nextInt(schedulerMaxBackoffMillis));
     jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
-    // Ensure we save the event timestamp that we're setting reminder for, in 
addition to our own event timestamp which may be different
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
+    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
+    // in addition to the event we want to initiate
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(status.getEventTimeMillis()));
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(originalEventTimeMillis));
     JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
-    Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
+    Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
     try {
-      LOG.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -  
attempting to schedule reminder for event %s in %s millis",
-          flowAction, originalEventTimeMillis, 
status.getReminderEventTimeMillis(), trigger.getNextFireTime());
+      log.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -  
attempting to schedule reminder for event %s in %s millis",
+          flowAction, originalEventTimeMillis, status.getEventTimeMillis(), 
trigger.getNextFireTime());
       this.schedulerService.getScheduler().scheduleJob(trigger);
     } catch (SchedulerException e) {
-      LOG.warn("Failed to add job reminder due to SchedulerException for job 
%s trigger event %s ", key, status.getReminderEventTimeMillis(), e);
+      log.warn("Failed to add job reminder due to SchedulerException for job 
%s trigger event %s ", key, status.getEventTimeMillis(), e);
     }
-    LOG.info(String.format("Scheduler Lease Algo Handler - [%s, 
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
-        flowAction, originalEventTimeMillis, 
status.getReminderEventTimeMillis(), trigger.getNextFireTime()));
+    log.info(String.format("Scheduler Lease Algo Handler - [%s, 
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",

Review Comment:
   (also here)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
-  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
     try {
       this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
           flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
-        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numLeasesCompleted.mark();
-        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
-            status.getMyLeaseAcquisitionTimestamp());
-      }
-    } catch (IOException | SQLException e) {
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numLeasesCompleted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction, 
status);
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
-    return false;
   }
 
   /**
-   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
-   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
-   * expire.
+   * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to 
schedule a reminder for itself to check on
+   * the other participant's progress to finish acting on a flow action after 
the time the lease should expire.
    * @param jobProps
    * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
    * @param originalEventTimeMillis the event timestamp we were originally 
handling
    * @param flowAction
    */
-  private void scheduleReminderForEvent(Properties jobProps, 
LeasedToAnotherStatus status,
+  private void scheduleReminderForEvent(Properties jobProps, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,

Review Comment:
   it may be necessary to overload this w/ a form taking 
`LeaseObtainedStatus`...



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -55,31 +55,27 @@ public void 
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
     try {
       // If an existing resume request is still pending then do not accept 
this request
       if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
-        this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME,
-            new RuntimeException("There is already a pending RESUME action for 
this flow. Please wait to resubmit and wait for"
-                + " action to be completed."));
+        this.handleException("There is already a pending RESUME action for 
this flow. Please wait to resubmit and wait "

Review Comment:
   nit: `handleError` / `prepareError` (?)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -166,8 +165,8 @@ public 
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
       Config config,
       Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, 
Optional<TopologyCatalog> topologyCatalog,
       Orchestrator orchestrator, SchedulerService schedulerService, 
Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
-      @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled, 
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
multiActiveSchedulerEnabled,
-      SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler) throws Exception {
+      @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
+      Optional<FlowTriggerHandler> schedulerLeaseAlgoHandler) throws Exception 
{

Review Comment:
   rename `schedulerLeaseAlgoHandler`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -55,31 +55,27 @@ public void 
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
     try {
       // If an existing resume request is still pending then do not accept 
this request
       if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
-        this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME,
-            new RuntimeException("There is already a pending RESUME action for 
this flow. Please wait to resubmit and wait for"
-                + " action to be completed."));
+        this.handleException("There is already a pending RESUME action for 
this flow. Please wait to resubmit and wait "
+            + "for action to be completed.", HttpStatus.S_409_CONFLICT);
         return;
       }
       this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
     } catch (IOException | SQLException e) {
       log.warn(
           String.format("Failed to add execution resume action for flow %s %s 
%s to dag action store due to", flowGroup,
               flowName, flowExecutionId), e);
-      this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME, e);
+      this.handleException(e.getMessage(), 
HttpStatus.S_500_INTERNAL_SERVER_ERROR);
     }
 
   }
 
-  private void handleException (String flowGroup, String flowName, String 
flowExecutionId, DagActionStore.FlowActionType flowActionType, Exception e) {
-    try {
-      if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, 
flowActionType)) {
-        throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, 
e.getMessage());
-      } else {
-        throw new 
RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
-      }
-    } catch (IOException | SQLException ex) {
-      throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, 
e.getMessage());
+  private void handleException(String exceptionMessage, HttpStatus errorType) {
+    if (errorType == HttpStatus.S_409_CONFLICT) {
+      throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, 
exceptionMessage);
+    } else if (errorType == HttpStatus.S_400_BAD_REQUEST) {
+      new UpdateResponse(HttpStatus.S_400_BAD_REQUEST);

Review Comment:
   the other two throw... should this not?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -66,24 +67,37 @@
  * host should actually complete its work while having the lease and then mark 
the flow action as NULL to indicate no
  * further leasing should be done for the event.
  */
-public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+@Slf4j
+public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
   /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
   @FunctionalInterface
   protected interface CheckedFunction<T, R> {
     R apply(T t) throws IOException, SQLException;
   }
 
-  public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+  public static final String CONFIG_PREFIX = "MysqlMultiActiveLeaseArbiter";

Review Comment:
   better to define the prefix in `ConfigurationKeys`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -199,23 +197,27 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction flowAction, l
       int leaseValidityStatus = resultSet.getInt(4);

Review Comment:
   hopefully doesn't feel like overkill, but I'd abstract this by defining a 
`static` inner `@Data` class with an overloaded constructor (or `static` 
factory method) taking a `ResultSet`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -100,8 +114,8 @@ public class MySQLMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   // Insert or update row to acquire lease if values have not changed since 
the previous read
   // Need to define three separate statements to handle cases where row does 
not exist or has null values to check
   protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
-      + "(flow_group, flow_name, flow_execution_id, flow_action, 
event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT "
-      + "EXISTS (SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_KEY + "); " + 
SELECT_AFTER_INSERT_STATEMENT;
+      + "(flow_group, flow_name, flow_execution_id, flow_action, 
event_timestamp) VALUES (?, ?, ?, ?, ?); "
+      + SELECT_AFTER_INSERT_STATEMENT;

Review Comment:
   just thinking... it may be clearer not to append this to the constant, but 
rather to catenate them at point of use



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -579,11 +579,10 @@ public void run() {
       }
     }
 
-    private void clearUpDagAction(DagId dagId) throws IOException {
+    private void clearUpDagAction(DagId dagId, DagActionStore.FlowActionType 
flowActionType) throws IOException {

Review Comment:
   nit: "clear up" seems indirect and w/ a wide variety of connotations (from 
misunderstandings and weather... to skin care).
   
   what are we doing here?  "resolving" the action?  "dismissing"?  "deleting"?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java:
##########
@@ -581,7 +581,7 @@ public void close() throws IOException {
   /**
    * Get a {@link org.quartz.Trigger} from the given job configuration 
properties.
    */
-  public Trigger getTrigger(JobKey jobKey, Properties jobProps) {
+  public Trigger createTriggerForJob(JobKey jobKey, Properties jobProps) {

Review Comment:
   couldn't this be `static`?  if so, let's!



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -579,11 +579,10 @@ public void run() {
       }
     }
 
-    private void clearUpDagAction(DagId dagId) throws IOException {
+    private void clearUpDagAction(DagId dagId, DagActionStore.FlowActionType 
flowActionType) throws IOException {
       if (this.dagActionStore.isPresent()) {
         this.dagActionStore.get().deleteDagAction(
-            new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName, 
dagId.flowExecutionId,
-                DagActionStore.FlowActionType.KILL));
+            new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName, 
dagId.flowExecutionId, flowActionType));

Review Comment:
   minor, but depending on how prevalent this is, it might be reasonable to 
supply an additional constructor:
   ```
   public DagAction(DagId dagId, FlowActionType flowActionType) { ... }
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -106,22 +104,20 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   private FlowStatusGenerator flowStatusGenerator;
 
   private UserQuotaManager quotaManager;
-  private boolean isMultiActiveSchedulerEnabled;
-  private SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler;
+  private Optional<FlowTriggerHandler> schedulerLeaseAlgoHandler;

Review Comment:
   rename `schedulerLeaseAlgoHandler`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -317,26 +312,27 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
       }
 
       // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
-      if (this.isMultiActiveSchedulerEnabled) {
+      if (schedulerLeaseAlgoHandler.isPresent()) {
+        // If triggerTimestampMillis is 0, then it was not set by the job 
trigger handler, and we cannot handle this event
+        if (triggerTimestampMillis == 0L) {
+          _log.warn("Skipping execution of spec: {} because missing trigger 
timestamp in job properties",
+              jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+          flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow orchestration 
skipped because no trigger timestamp "
+              + "associated with flow action.");
+          if (this.eventSubmitter.isPresent()) {
+            new TimingEvent(this.eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);

Review Comment:
   tip: `ifPresent()`
   
   (note: this works easily and naturally... unless checked exceptions)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);

Review Comment:
   shouldn't we confirm the return value to determine whether to schedule a 
reminder?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -378,6 +374,26 @@ public void orchestrate(Spec spec, Properties jobProps, 
long triggerTimestampMil
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  public void submitFlowToDagManager(FlowSpec flowSpec, 
Optional<Dag<JobExecutionPlan>> jobExecutionPlanDag)
+      throws IOException {
+    if (!jobExecutionPlanDag.isPresent()) {
+      jobExecutionPlanDag = Optional.of(specCompiler.compileFlow(flowSpec));
+    }

Review Comment:
   while `Optional` is generally great, it's only needed when those calling are 
unaware of whether or not they are holding on to a `Dag<>` or not.  when it is 
statically known by the caller, instead clearer is to overload 
`submitFlowToDagManager` to both a unary and binary form.
   
   here the unary form merely compiles the DAG before forwarding/delegating to 
the binary one:
   ```
   public void submitFlowToDagManager(FlowSpec flowSpec) throws IOE {
     submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
   }
   ```



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
-  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
     try {
       this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
           flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
-        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numLeasesCompleted.mark();
-        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
-            status.getMyLeaseAcquisitionTimestamp());
-      }
-    } catch (IOException | SQLException e) {
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numLeasesCompleted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction, 
status);
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
-    return false;
   }
 
   /**
-   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
-   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
-   * expire.
+   * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to 
schedule a reminder for itself to check on
+   * the other participant's progress to finish acting on a flow action after 
the time the lease should expire.
    * @param jobProps
    * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
    * @param originalEventTimeMillis the event timestamp we were originally 
handling
    * @param flowAction
    */
-  private void scheduleReminderForEvent(Properties jobProps, 
LeasedToAnotherStatus status,
+  private void scheduleReminderForEvent(Properties jobProps, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
       DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
-    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() + 
random.nextInt(staggerUpperBoundSec));
+    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+        + random.nextInt(schedulerMaxBackoffMillis));
     jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
-    // Ensure we save the event timestamp that we're setting reminder for, in 
addition to our own event timestamp which may be different
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
+    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
+    // in addition to the event we want to initiate
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(status.getEventTimeMillis()));
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(originalEventTimeMillis));
     JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
-    Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
+    Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);

Review Comment:
   would be ideal if this invocation could be `static`!



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
-  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
     try {
       this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
           flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
-        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numLeasesCompleted.mark();
-        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
-            status.getMyLeaseAcquisitionTimestamp());
-      }
-    } catch (IOException | SQLException e) {
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numLeasesCompleted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction, 
status);
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
-    return false;
   }
 
   /**
-   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
-   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
-   * expire.
+   * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to 
schedule a reminder for itself to check on

Review Comment:
   a. I like the `@link`
   b. "a reminder for itself" => "a self-reminder"



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -96,82 +93,79 @@ public SchedulerLeaseAlgoHandler(Config config, 
MultiActiveLeaseArbiter leaseDet
    * @param eventTimeMillis
    * @throws IOException
    */
-  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
-    LeaseAttemptStatus leaseAttemptStatus =
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
         multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
     // TODO: add a log event or metric for each of these cases
-    switch (leaseAttemptStatus.getClass().getSimpleName()) {
-      case "LeaseObtainedStatus":
-        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
-        break;
-      case "LeasedToAnotherStatus":
-        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
-        break;
-      case "NoLongerLeasingStatus":
-        break;
-      default:
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      persistFlowAction((MultiActiveLeaseArbiter.LeaseObtainedStatus) 
leaseAttemptStatus, flowAction);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus, flowAction,
+          eventTimeMillis);
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
     }
+    log.warn("Received type of leaseAttemptStatus: {} not handled by this 
method", leaseAttemptStatus.getClass().getName());
   }
 
   // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
-  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
     try {
       this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
           flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
-      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
-          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
-        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
-        this.numLeasesCompleted.mark();
-        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
-            status.getMyLeaseAcquisitionTimestamp());
-      }
-    } catch (IOException | SQLException e) {
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numLeasesCompleted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(flowAction, 
status);
+    } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
-    return false;
   }
 
   /**
-   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
-   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
-   * expire.
+   * This method is used by FlowTriggerHandler.handleNewSchedulerEvent to 
schedule a reminder for itself to check on
+   * the other participant's progress to finish acting on a flow action after 
the time the lease should expire.
    * @param jobProps
    * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
    * @param originalEventTimeMillis the event timestamp we were originally 
handling
    * @param flowAction
    */
-  private void scheduleReminderForEvent(Properties jobProps, 
LeasedToAnotherStatus status,
+  private void scheduleReminderForEvent(Properties jobProps, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
       DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
     // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
-    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() + 
random.nextInt(staggerUpperBoundSec));
+    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+        + random.nextInt(schedulerMaxBackoffMillis));
     jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
-    // Ensure we save the event timestamp that we're setting reminder for, in 
addition to our own event timestamp which may be different
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
-    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
+    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
+    // in addition to the event we want to initiate
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(status.getEventTimeMillis()));
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(originalEventTimeMillis));
     JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
-    Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
+    Trigger trigger = this.jobScheduler.createTriggerForJob(key, jobProps);
     try {
-      LOG.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -  
attempting to schedule reminder for event %s in %s millis",
-          flowAction, originalEventTimeMillis, 
status.getReminderEventTimeMillis(), trigger.getNextFireTime());
+      log.info("Scheduler Lease Algo Handler - [%s, eventTimestamp: %s] -  
attempting to schedule reminder for event %s in %s millis",
+          flowAction, originalEventTimeMillis, status.getEventTimeMillis(), 
trigger.getNextFireTime());
       this.schedulerService.getScheduler().scheduleJob(trigger);
     } catch (SchedulerException e) {
-      LOG.warn("Failed to add job reminder due to SchedulerException for job 
%s trigger event %s ", key, status.getReminderEventTimeMillis(), e);
+      log.warn("Failed to add job reminder due to SchedulerException for job 
%s trigger event %s ", key, status.getEventTimeMillis(), e);
     }
-    LOG.info(String.format("Scheduler Lease Algo Handler - [%s, 
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
-        flowAction, originalEventTimeMillis, 
status.getReminderEventTimeMillis(), trigger.getNextFireTime()));
+    log.info(String.format("Scheduler Lease Algo Handler - [%s, 
eventTimestamp: %s] - SCHEDULED REMINDER for event %s in %s millis",
+        flowAction, originalEventTimeMillis, status.getEventTimeMillis(), 
trigger.getNextFireTime()));
   }
 
   /**
    * These methods should only be called from the Orchestrator or JobScheduler 
classes as it directly adds jobs to the
    * Quartz scheduler
-   * @param delayPeriodSeconds
+   * @param delayPeriodMillis
    * @return
    */
-  protected static String createCronFromDelayPeriod(long delayPeriodSeconds) {
+  protected static String createCronFromDelayPeriod(long delayPeriodMillis) {
     LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
-    LocalDateTime delaySecondsLater = now.plus(delayPeriodSeconds, 
ChronoUnit.SECONDS);
+    LocalDateTime delaySecondsLater = now.plus(delayPeriodMillis, 
ChronoUnit.MILLIS);

Review Comment:
   nit: `delaySecondsLater` is neither seconds nor an interval, but an absolute 
time in millis



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -165,9 +161,8 @@ public Orchestrator(Config config, 
Optional<TopologyCatalog> topologyCatalog, Op
 
   @Inject
   public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, 
Optional<TopologyCatalog> topologyCatalog,
-      Optional<DagManager> dagManager, Optional<Logger> log, 
@Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
multiActiveSchedulerEnabled,
-      SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler) {
-    this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, 
multiActiveSchedulerEnabled, schedulerLeaseAlgoHandler);
+      Optional<DagManager> dagManager, Optional<Logger> log, 
Optional<FlowTriggerHandler> schedulerLeaseAlgoHandler) {

Review Comment:
   need to rename `schedulerLeaseAlgoHandler`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -66,24 +67,37 @@
  * host should actually complete its work while having the lease and then mark 
the flow action as NULL to indicate no
  * further leasing should be done for the event.
  */
-public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+@Slf4j
+public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
   /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
   @FunctionalInterface
   protected interface CheckedFunction<T, R> {
     R apply(T t) throws IOException, SQLException;
   }
 
-  public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+  public static final String CONFIG_PREFIX = "MysqlMultiActiveLeaseArbiter";
 
   protected final DataSource dataSource;
   private final String leaseArbiterTableName;
   private final String constantsTableName;
   private final int epsilon;
   private final int linger;
+
+  // TODO: define retention on this table
+  private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %S ("
+      + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") 
NOT NULL, flow_name varchar("
+      + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + 
"flow_execution_id varchar("
+      + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, 
flow_action varchar(100) NOT NULL, "
+      + "event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "

Review Comment:
   suggest no default since probably shouldn't be possible to insert w/o a 
specific `event_timestamp`
   
   (the default for `lease_acq_tstamp` seems reasonable)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 865141)
    Time Spent: 13h 20m  (was: 13h 10m)

> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
>                 Key: GOBBLIN-1837
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 13h 20m
>  Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active 
> scheduler for each host. It will NOT include metric emission or unit tests 
> for validation. That will be done in a separate follow-up ticket. The work in 
> this ticket includes
>  * define a table to do scheduler lease determination for each flow's trigger 
> event and related methods to execute actions on this tableĀ 
>  * update DagActionStore schema and DagActionStoreMonitor to act upon new 
> "LAUNCH" type events in addition to KILL/RESUME
>  * update scheduler/orchestrator logic to apply the non-blocking algorithm 
> when "multi-active scheduler mode" is enabled, otherwise submit events 
> directly to the DagManager after receiving a scheduler trigger
>  * implement the non-blocking algorithm, particularly handling reminder 
> events if another host is in the process of securing the lease for a 
> particular flow trigger



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to