[ 
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=864298&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-864298
 ]

ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Jun/23 22:31
            Start Date: 07/Jun/23 22:31
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1222050277


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java:
##########
@@ -26,4 +26,5 @@ public final class InjectionNames {
   public static final String FORCE_LEADER = "forceLeader";
   public static final String FLOW_CATALOG_LOCAL_COMMIT = 
"flowCatalogLocalCommit";
   public static final String WARM_STANDBY_ENABLED = "statelessRestAPIEnabled";

Review Comment:
   shall we add a TODO to rename "WARM_STANDBY", being a misnomer?  since, 
happily it's not the actual value of underlying config name, it should be 
easier to replace.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java:
##########
@@ -56,7 +64,7 @@ private DagActionStoreChangeMonitor 
createDagActionStoreMonitor()
     String topic = ""; // Pass empty string because we expect underlying 
client to dynamically determine the Kafka topic
     int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, 
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
 
-    return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig, 
this.dagActionStore, this.dagManager, numThreads);
+    return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig, 
this.dagActionStore, this.dagManager, numThreads, 
isMultiActiveSchedulerEnabled, flowCatalog);

Review Comment:
   minor, but the ordering of the two new params is switched between the 
factory and the ctor of what it creates.  I suggest aligning.



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,20 @@ public class ConfigurationKeys {
   public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+  // Scheduler lease determination store configuration
+  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
"multi.active.scheduler.constants.db.table";
+  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "multi.active.scheduler.";
+  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= "scheduler.lease.determination.store.db.table";
+  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"gobblin_scheduler_lease_determination_store";
+  public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY = 
"reminderEventTimestampMillis";
+  public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY = 
"newEventTimestampMillis";
+  public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
+  public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 100;
+  // Note: linger should be on the order of seconds even though we measure in 
millis
+  public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = "";
+  public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
+  public static final String SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY = "";
+  public static final int DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC = 5;

Review Comment:
   those above are in millis... might it be confusing to switch to secs for 
this one?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which 
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to 
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values 
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease, 
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp 
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that 
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to 
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on 
a flow event. It should be much greater
+ *            than epsilon and encapsulate executor communication latency 
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event 
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH, 
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution 
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and 
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most 
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of 
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt 
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark 
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {

Review Comment:
   spelled differently than `MysqlDagActionStore`.  the latter capitalization 
seems prevalent in our code base



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeasedToAnotherStatus.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import lombok.Getter;
+
+/*
+The event in question has been leased to another. This object contains 
`reminderEventTimestamp` which is the event
+timestamp the lease is associated with as well as `minimumReminderWaitMillis` 
the minimum amount of time to wait
+before returning to check if the lease has completed or expired.
+ */
+public class LeasedToAnotherStatus extends LeaseAttemptStatus {
+  @Getter
+  private final long reminderEventTimeMillis;

Review Comment:
   merely `eventTimestamp` is preferable here, since that aligns w/ the naming 
in `LeaseObtainedStatus`



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,20 @@ public class ConfigurationKeys {
   public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+  // Scheduler lease determination store configuration
+  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
"multi.active.scheduler.constants.db.table";
+  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "multi.active.scheduler.";
+  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= "scheduler.lease.determination.store.db.table";
+  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"gobblin_scheduler_lease_determination_store";
+  public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY = 
"reminderEventTimestampMillis";
+  public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY = 
"newEventTimestampMillis";
+  public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";
+  public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 100;
+  // Note: linger should be on the order of seconds even though we measure in 
millis
+  public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = "";
+  public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
+  public static final String SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY = "";

Review Comment:
   `UPPER_BOUND` => `MAX` ?
   
   `STAGGERING` => `DELAY` ?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java:
##########
@@ -580,7 +581,9 @@ public void run() {
 
     private void clearUpDagAction(DagId dagId) throws IOException {
       if (this.dagActionStore.isPresent()) {
-        this.dagActionStore.get().deleteDagAction(dagId.flowGroup, 
dagId.flowName, dagId.flowExecutionId);
+        this.dagActionStore.get().deleteDagAction(
+            new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName, 
dagId.flowExecutionId,
+                DagActionStore.FlowActionType.KILL));

Review Comment:
   how do we know for sure that this would be a `FlowActionType.KILL`?  (may be 
worth a code comment)



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,20 @@ public class ConfigurationKeys {
   public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+  // Scheduler lease determination store configuration
+  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
"multi.active.scheduler.constants.db.table";
+  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "multi.active.scheduler.";

Review Comment:
   is this supposed to be the default name for the db table?  if so, should it 
have '.' in it?  (I'm going by what I see for 
`DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE`)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java:
##########
@@ -72,6 +73,8 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_GET_SPEC_TIME_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.totalGetSpecTimeNanos";
   public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_ADD_SPEC_TIME_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.totalAddSpecTimeNanos";
   public static final String 
GOBBLIN_JOB_SCHEDULER_NUM_JOBS_SCHEDULED_DURING_STARTUP = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.numJobsScheduledDuringStartup";
+  // Metrics Used to Track SchedulerLeaseAlgoHandlerProgress
+  public static final String 
GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".schedulerLeaseAlgoHandler.numLeasesCompleted";

Review Comment:
   nit: what does it mean to "complete" a lease?  for this host to be the one 
to get it... or for any host to get it?  for the lease to expire?  to actually 
accomplish something before it expired?
   
   let's seek a different word to unambiguously articulate what's measured.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -440,12 +446,24 @@ public synchronized void scheduleJob(Properties jobProps, 
JobListener jobListene
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
       Spec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
-      this.orchestrator.orchestrate(flowSpec);
+      String triggerTimestampMillis = extractTriggerTimestampMillis(jobProps);

Review Comment:
   just wondering... what are the ramifications of neither of the keys being 
defined so the trigger timestamp winds up as `0L`?



##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,20 @@ public class ConfigurationKeys {
   public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+  // Scheduler lease determination store configuration
+  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
"multi.active.scheduler.constants.db.table";
+  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "multi.active.scheduler.";
+  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= "scheduler.lease.determination.store.db.table";
+  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"gobblin_scheduler_lease_determination_store";
+  public static final String SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY = 
"reminderEventTimestampMillis";
+  public static final String SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY = 
"newEventTimestampMillis";
+  public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = "";

Review Comment:
   empty string here (and for linger and staggering_upper_bound)?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -54,27 +53,26 @@ public void 
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
     String flowName = key.getKey().getFlowName();
     Long flowExecutionId = key.getKey().getFlowExecutionId();
     try {
-      // If an existing resume or kill request is still pending then do not 
accept this request
-      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString())) {
-        DagActionStore.DagActionValue action = 
this.dagActionStore.getDagAction(flowGroup, flowName, 
flowExecutionId.toString()).getDagActionValue();
-        this.handleException(flowGroup, flowName, flowExecutionId.toString(),
-            new RuntimeException("There is already a pending action " + action 
+ " for this flow. Please wait to resubmit and wait for"
+      // If an existing resume request is still pending then do not accept 
this request
+      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
+        this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME,
+            new RuntimeException("There is already a pending RESUME action for 
this flow. Please wait to resubmit and wait for"

Review Comment:
   the signature of `handleException` is encouraging this anti-pattern of 
constructing an `Exception` just to tunnel through a message.  let's change the 
signature to take a `String message` as the final param, and push the 
invocation of `.getMessage()` up to the caller.



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -71,17 +88,38 @@ public String load(String key) throws Exception {
   protected DagActionStore dagActionStore;
 
   protected DagManager dagManager;
+  protected SpecCompiler specCompiler;
+  protected boolean isMultiActiveSchedulerEnabled;
+  protected FlowCatalog flowCatalog;
+  protected EventSubmitter eventSubmitter;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public DagActionStoreChangeMonitor(String topic, Config config, 
DagActionStore dagActionStore, DagManager dagManager,
-      int numThreads) {
+      int numThreads, boolean isMultiActiveSchedulerEnabled, FlowCatalog 
flowCatalog) {
     // Differentiate group id for each host
     super(topic, config.withValue(GROUP_ID_KEY,
         ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + 
UUID.randomUUID().toString())),
         numThreads);
     this.dagActionStore = dagActionStore;
     this.dagManager = dagManager;
+    ClassAliasResolver aliasResolver = new 
ClassAliasResolver(SpecCompiler.class);
+    try {
+      String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
+      if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
+        specCompilerClassName = 
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
+      }
+      log.info("Using specCompiler class name/alias " + specCompilerClassName);
+
+      this.specCompiler = (SpecCompiler) 
ConstructorUtils.invokeConstructor(Class.forName(aliasResolver.resolve(
+          specCompilerClassName)), config);
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException
+             | ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }

Review Comment:
   if avoidable, I'd prefer to neither initialize nor maintain an additional 
`SpecCompiler` here, apart from the one already in the `Orchestrator`.  what 
would it look like if we initialized this class w/ the `Orchestrator` and then 
added a method to the latter handling essentially the `submitFlowToDagManager` 
defined just below?
   
   although you might still require a `FlowCatalog` on hand, the 
`EventSubmitter` could go away to also be encapsulated within the `Orchestrator`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -177,15 +207,46 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
   }
 
+  protected void submitFlowToDagManager(String flowGroup, String flowName) {
+    // Retrieve job execution plan by recompiling the flow spec to send to the 
DagManager
+    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+    FlowSpec spec = null;
+    try {
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+      spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);
+      //Send the dag to the DagManager.
+      dagManager.addDag(jobExecutionPlanDag, true, true);
+    } catch (URISyntaxException e) {
+      log.warn("Could not create URI object for flowId {} due to error {}", 
flowId, e.getMessage());
+      this.unexpectedErrors.mark();
+      return;
+    } catch (SpecNotFoundException e) {
+      log.warn("Spec not found for flow group: {} name: {} Exception: {}", 
flowGroup, flowName, e);
+      this.unexpectedErrors.mark();
+      return;
+    } catch (IOException e) {
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(spec);
+      String failureMessage = "Failed to add Job Execution Plan due to: " + 
e.getMessage();
+      flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
+      new TimingEvent(this.eventSubmitter, 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+      log.warn("Failed to add Job Execution Plan for flow group: {} name: {} 
due to error {}", flowGroup, flowName, e);

Review Comment:
   again, `flowId`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -20,28 +20,31 @@
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Collection;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
+
+import lombok.Data;
 
 
 public interface DagActionStore {
-  enum DagActionValue {
-    KILL,
-    RESUME
+  enum FlowActionType {
+    KILL, // Kill invoked through API call
+    RESUME, // Resume flow invoked through API call
+    LAUNCH, // Launch new flow execution invoked adhoc or through scheduled 
trigger
+    RETRY, // Invoked through DagManager for flows configured to allow retries
+    CANCEL, // Invoked through DagManager if flow has been stuck in 
Orchestrated state for a while
+    ADVANCE // Launch next step in multi-hop dag
   }
 
-  @Getter
-  @EqualsAndHashCode
+  @Data
   class DagAction {
     String flowGroup;
     String flowName;
     String flowExecutionId;
-    DagActionValue dagActionValue;
-    public DagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionValue dagActionValue) {
+    FlowActionType flowActionType;
+    public DagAction(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType) {
       this.flowGroup = flowGroup;
       this.flowName = flowName;
       this.flowExecutionId = flowExecutionId;
-      this.dagActionValue = dagActionValue;
+      this.flowActionType = flowActionType;
     }

Review Comment:
   I actually thought `@Data` would synthesize such a ctor, but if not, then 
just add `@AllArgsConstructor`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event. 
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event. 
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response 
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ *  1. Multiple instances receive knowledge of a flow action event to act upon
+ *  2. Each instance attempts to acquire rights or `a lease` to be the sole 
instance acting on the event by calling the
+ *      tryAcquireLease method below and receives the resulting status. The 
status indicates whether this instance has
+ *        a) acquired the lease -> then this instance will attempt to complete 
the lease
+ *        b) another has acquired the lease -> then another will attempt to 
complete the lease
+ *        c) flow event no longer needs to be acted upon -> terminal state
+ *  3. If another has acquired the lease, then the instance will check back in 
at the time of lease expiry to see if it
+ *    needs to attempt the lease again [status (b) above].
+ *  4. Once the instance which acquired the lease completes its work on the 
flow event, it calls completeLeaseUse to
+ *    indicate to all other instances that the flow event no longer needs to 
be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+  static final Logger LOG = 
LoggerFactory.getLogger(MultiActiveLeaseArbiter.class);

Review Comment:
   instead use `@Slf4j`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event. 
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event. 
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response 
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ *  1. Multiple instances receive knowledge of a flow action event to act upon
+ *  2. Each instance attempts to acquire rights or `a lease` to be the sole 
instance acting on the event by calling the
+ *      tryAcquireLease method below and receives the resulting status. The 
status indicates whether this instance has
+ *        a) acquired the lease -> then this instance will attempt to complete 
the lease
+ *        b) another has acquired the lease -> then another will attempt to 
complete the lease
+ *        c) flow event no longer needs to be acted upon -> terminal state
+ *  3. If another has acquired the lease, then the instance will check back in 
at the time of lease expiry to see if it
+ *    needs to attempt the lease again [status (b) above].
+ *  4. Once the instance which acquired the lease completes its work on the 
flow event, it calls completeLeaseUse to
+ *    indicate to all other instances that the flow event no longer needs to 
be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+  static final Logger LOG = 
LoggerFactory.getLogger(MultiActiveLeaseArbiter.class);
+
+  /**
+   * This method attempts to insert an entry into store for a particular flow 
action event if one does not already
+   * exist in the store for the flow action or has expired. Regardless of the 
outcome it also reads the lease
+   * acquisition timestamp of the entry for that flow action event (it could 
have pre-existed in the table or been newly
+   *  added by the previous write). Based on the transaction results, it will 
return @LeaseAttemptStatus to determine

Review Comment:
   `{@link LeaseAttemptStatus}`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java:
##########
@@ -20,28 +20,31 @@
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Collection;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
+
+import lombok.Data;
 
 
 public interface DagActionStore {
-  enum DagActionValue {
-    KILL,
-    RESUME
+  enum FlowActionType {

Review Comment:
   better name--nice!



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event. 
The type of flow event in question does

Review Comment:
   suggestions:
   "instances" => "participants" (since to avoid OO connotations)
   
   "over ownership of" => "to take responsibility for"



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event. 
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event. 
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response 
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ *  1. Multiple instances receive knowledge of a flow action event to act upon
+ *  2. Each instance attempts to acquire rights or `a lease` to be the sole 
instance acting on the event by calling the
+ *      tryAcquireLease method below and receives the resulting status. The 
status indicates whether this instance has
+ *        a) acquired the lease -> then this instance will attempt to complete 
the lease

Review Comment:
   "to complete the lease" => "to carry out the required action before the 
lease expires"
   
   for clarity's sake, I'd suggest stating the class that each of the three 
corresponds to, as in:
   "acquired the lease (`LeaseObtainedStatus` returned) -> ..."



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -54,27 +53,26 @@ public void 
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
     String flowName = key.getKey().getFlowName();
     Long flowExecutionId = key.getKey().getFlowExecutionId();
     try {
-      // If an existing resume or kill request is still pending then do not 
accept this request
-      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString())) {
-        DagActionStore.DagActionValue action = 
this.dagActionStore.getDagAction(flowGroup, flowName, 
flowExecutionId.toString()).getDagActionValue();
-        this.handleException(flowGroup, flowName, flowExecutionId.toString(),
-            new RuntimeException("There is already a pending action " + action 
+ " for this flow. Please wait to resubmit and wait for"
+      // If an existing resume request is still pending then do not accept 
this request
+      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
+        this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME,
+            new RuntimeException("There is already a pending RESUME action for 
this flow. Please wait to resubmit and wait for"
                 + " action to be completed."));
         return;
       }
-      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME);
-    } catch (IOException | SQLException | SpecNotFoundException e) {
+      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
+    } catch (IOException | SQLException e) {
       log.warn(
           String.format("Failed to add execution resume action for flow %s %s 
%s to dag action store due to", flowGroup,
               flowName, flowExecutionId), e);
-      this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+      this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME, e);
     }
 
   }
 
-  private void handleException (String flowGroup, String flowName, String 
flowExecutionId, Exception e) {
+  private void handleException (String flowGroup, String flowName, String 
flowExecutionId, DagActionStore.FlowActionType flowActionType, Exception e) {

Review Comment:
   nit: extra space after method name



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event. 
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event. 
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response 
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ *  1. Multiple instances receive knowledge of a flow action event to act upon

Review Comment:
   "independently learn of"



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseAttemptStatus.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+public abstract class LeaseAttemptStatus {
+  protected LeaseAttemptStatus() {}

Review Comment:
   I know the default ctor will be `public`, but just go w/ that.  besides this 
class is `abstract`, so none may create one anyway



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseAttemptStatus.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+public abstract class LeaseAttemptStatus {

Review Comment:
   I agree w/ defining this as the common base of a three-way alt. (roughly 
modeling an algebraic data type).  for clarity's sake, however, let's keep the 
entire hierarchy together rather than splintering aross four files.
   
   do so by making these static inner classes of `MultiActiveLeaseArbiter`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event. 
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event. 
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response 
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ *  1. Multiple instances receive knowledge of a flow action event to act upon
+ *  2. Each instance attempts to acquire rights or `a lease` to be the sole 
instance acting on the event by calling the
+ *      tryAcquireLease method below and receives the resulting status. The 
status indicates whether this instance has
+ *        a) acquired the lease -> then this instance will attempt to complete 
the lease
+ *        b) another has acquired the lease -> then another will attempt to 
complete the lease
+ *        c) flow event no longer needs to be acted upon -> terminal state
+ *  3. If another has acquired the lease, then the instance will check back in 
at the time of lease expiry to see if it
+ *    needs to attempt the lease again [status (b) above].
+ *  4. Once the instance which acquired the lease completes its work on the 
flow event, it calls completeLeaseUse to
+ *    indicate to all other instances that the flow event no longer needs to 
be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+  static final Logger LOG = 
LoggerFactory.getLogger(MultiActiveLeaseArbiter.class);
+
+  /**
+   * This method attempts to insert an entry into store for a particular flow 
action event if one does not already
+   * exist in the store for the flow action or has expired. Regardless of the 
outcome it also reads the lease
+   * acquisition timestamp of the entry for that flow action event (it could 
have pre-existed in the table or been newly
+   *  added by the previous write). Based on the transaction results, it will 
return @LeaseAttemptStatus to determine
+   *  the next action.
+   * @param flowAction uniquely identifies the flow
+   * @param eventTimeMillis is the time this flow action should occur
+   * @return LeaseAttemptStatus
+   * @throws IOException
+   */
+  LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long 
eventTimeMillis) throws IOException;
+
+  /**
+   * This method is used to indicate the owner of the lease has successfully 
completed required actions while holding
+   * the lease of the flow action event. It marks the lease as "no longer 
leasing", if the eventTimeMillis and
+   * leaseAcquisitionTimeMillis values have not changed since this owner 
acquired the lease (indicating the lease did
+   * not expire).
+   * @return true if successfully updated, indicating no further actions need 
to be taken regarding this event.

Review Comment:
   it may be non-obvious, so let's describe the meaning of `false` as well 
(that the caller should continue seeking to acquire the lease, as if any 
actions it did successfully accomplish, do not actually count)
   
   relatedly, minor suggestion on naming: `recordLeaseSuccess` or 
`markLeaseSuccess`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event. 
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event. 
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response 
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ *  1. Multiple instances receive knowledge of a flow action event to act upon
+ *  2. Each instance attempts to acquire rights or `a lease` to be the sole 
instance acting on the event by calling the
+ *      tryAcquireLease method below and receives the resulting status. The 
status indicates whether this instance has
+ *        a) acquired the lease -> then this instance will attempt to complete 
the lease
+ *        b) another has acquired the lease -> then another will attempt to 
complete the lease
+ *        c) flow event no longer needs to be acted upon -> terminal state
+ *  3. If another has acquired the lease, then the instance will check back in 
at the time of lease expiry to see if it
+ *    needs to attempt the lease again [status (b) above].
+ *  4. Once the instance which acquired the lease completes its work on the 
flow event, it calls completeLeaseUse to
+ *    indicate to all other instances that the flow event no longer needs to 
be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+  static final Logger LOG = 
LoggerFactory.getLogger(MultiActiveLeaseArbiter.class);
+
+  /**
+   * This method attempts to insert an entry into store for a particular flow 
action event if one does not already
+   * exist in the store for the flow action or has expired. Regardless of the 
outcome it also reads the lease
+   * acquisition timestamp of the entry for that flow action event (it could 
have pre-existed in the table or been newly
+   *  added by the previous write). Based on the transaction results, it will 
return @LeaseAttemptStatus to determine
+   *  the next action.
+   * @param flowAction uniquely identifies the flow
+   * @param eventTimeMillis is the time this flow action should occur

Review Comment:
   "...the flow and the present action upon it"
   
   "...the time `flowAction` was triggered"



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event. 
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event. 
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response 
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ *  1. Multiple instances receive knowledge of a flow action event to act upon
+ *  2. Each instance attempts to acquire rights or `a lease` to be the sole 
instance acting on the event by calling the
+ *      tryAcquireLease method below and receives the resulting status. The 
status indicates whether this instance has
+ *        a) acquired the lease -> then this instance will attempt to complete 
the lease
+ *        b) another has acquired the lease -> then another will attempt to 
complete the lease
+ *        c) flow event no longer needs to be acted upon -> terminal state
+ *  3. If another has acquired the lease, then the instance will check back in 
at the time of lease expiry to see if it
+ *    needs to attempt the lease again [status (b) above].
+ *  4. Once the instance which acquired the lease completes its work on the 
flow event, it calls completeLeaseUse to
+ *    indicate to all other instances that the flow event no longer needs to 
be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+  static final Logger LOG = 
LoggerFactory.getLogger(MultiActiveLeaseArbiter.class);
+
+  /**
+   * This method attempts to insert an entry into store for a particular flow 
action event if one does not already
+   * exist in the store for the flow action or has expired. Regardless of the 
outcome it also reads the lease
+   * acquisition timestamp of the entry for that flow action event (it could 
have pre-existed in the table or been newly
+   *  added by the previous write). Based on the transaction results, it will 
return @LeaseAttemptStatus to determine
+   *  the next action.
+   * @param flowAction uniquely identifies the flow
+   * @param eventTimeMillis is the time this flow action should occur
+   * @return LeaseAttemptStatus
+   * @throws IOException
+   */
+  LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long 
eventTimeMillis) throws IOException;
+
+  /**
+   * This method is used to indicate the owner of the lease has successfully 
completed required actions while holding
+   * the lease of the flow action event. It marks the lease as "no longer 
leasing", if the eventTimeMillis and
+   * leaseAcquisitionTimeMillis values have not changed since this owner 
acquired the lease (indicating the lease did
+   * not expire).
+   * @return true if successfully updated, indicating no further actions need 
to be taken regarding this event.
+   */
+  boolean completeLeaseUse(DagActionStore.DagAction flowAction, long 
eventTimeMillis, long leaseAcquisitionTimeMillis)

Review Comment:
   to hammer home that this should ONLY be called by someone actually holding 
the lease, what about having its only param be `LeaseObtainedStatus`?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseObtainedStatus.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import lombok.Getter;
+
+/*
+The instance calling this method acquired the lease for the event in question. 
The class contains the `eventTimestamp`
+associated with the lease as well as the time the lease was obtained by me or 
`myLeaseAcquisitionTimestamp`.
+ */
+public class LeaseObtainedStatus extends LeaseAttemptStatus {
+  @Getter
+  private final long eventTimestamp;
+
+  @Getter
+  private final long myLeaseAcquisitionTimestamp;

Review Comment:
   nit: no need to clarify it's "mine"/"yours"... just `leaseAcq...Tstamp` is 
enough



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeaseObtainedStatus.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import lombok.Getter;
+
+/*
+The instance calling this method acquired the lease for the event in question. 
The class contains the `eventTimestamp`
+associated with the lease as well as the time the lease was obtained by me or 
`myLeaseAcquisitionTimestamp`.
+ */
+public class LeaseObtainedStatus extends LeaseAttemptStatus {
+  @Getter
+  private final long eventTimestamp;
+
+  @Getter
+  private final long myLeaseAcquisitionTimestamp;
+
+  protected LeaseObtainedStatus(long eventTimestamp, long 
myLeaseAcquisitionTimestamp) {
+    super();
+    this.eventTimestamp = eventTimestamp;
+    this.myLeaseAcquisitionTimestamp = myLeaseAcquisitionTimestamp;
+  }

Review Comment:
   replace w/ `@Data` (and `@RequiredArgsConstructor`, if also necessary)



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
+import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to 
respond to flow action events. It uses the
+ * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to 
determine a single lease owner at a given time
+ * for a flow action event. After acquiring the lease, it persists the flow 
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it 
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link 
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse} 
method. Hosts that do not gain
+ * the lease for the event, instead schedule a reminder using the {@link 
SchedulerService} to check back in on the
+ * previous lease owner's completion status after the lease should expire to 
ensure the event is handled in failure
+ * cases.
+ */
+public class SchedulerLeaseAlgoHandler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);

Review Comment:
   `@Slf4j`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeasedToAnotherStatus.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import lombok.Getter;
+
+/*
+The event in question has been leased to another. This object contains 
`reminderEventTimestamp` which is the event
+timestamp the lease is associated with as well as `minimumReminderWaitMillis` 
the minimum amount of time to wait
+before returning to check if the lease has completed or expired.
+ */
+public class LeasedToAnotherStatus extends LeaseAttemptStatus {
+  @Getter
+  private final long reminderEventTimeMillis;
+
+  @Getter
+  private final long minimumReminderWaitMillis;

Review Comment:
   nit: I suggest against naming 'reminder', as that unnecessarily suggests how 
we expect the field to be used, and better to keep this class blissfully 
unaware of what goes on downstream.  instead, maybe adopt the "linger" naming, 
used elsewhere.  or else "lease expiration".
   
   I'm unclear whether this is meant to be an absolute timestamp or a relative 
duration (e.g. 5000 more millis).  the former may be preferable, since it's 
more resilient to unpredictable delays, like long GC pause.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**

Review Comment:
   some suggestions below... but overall, this being complicated documentation 
that you wrote up very well!



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/NoLongerLeasingStatus.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+/*
+This status is returned when a flow event was successfully leased and an 
instance completed the requirements for the
+event, so no further leasing is required.
+ */
+public class NoLongerLeasingStatus extends LeaseAttemptStatus {
+
+  protected NoLongerLeasingStatus() {
+    super();
+  }

Review Comment:
   I don't see a need to be `protected` (vs. just accepting synthesized default 
ctor's `public`). 
   
   also, NBD, but know that an implicit `super()` would be inserted for you if 
you don't explicitly call one of the super class's ctors



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -200,9 +206,9 @@ public 
GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String ser
   public GobblinServiceJobScheduler(String serviceName, Config config, 
FlowStatusGenerator flowStatusGenerator,
       Optional<HelixManager> helixManager,
       Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager, Optional<UserQuotaManager> 
quotaManager,
-      SchedulerService schedulerService,  Optional<Logger> log, boolean 
warmStandbyEnabled) throws Exception {
+      SchedulerService schedulerService,  Optional<Logger> log, boolean 
warmStandbyEnabled, boolean multiActiveSchedulerEnabled, 
SchedulerLeaseAlgoHandler schedulerLeaseAlgoHandler) throws Exception {
     this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
-        new Orchestrator(config, flowStatusGenerator, topologyCatalog, 
dagManager, log), schedulerService, quotaManager, log, warmStandbyEnabled);
+        new Orchestrator(config, flowStatusGenerator, topologyCatalog, 
dagManager, log, multiActiveSchedulerEnabled, schedulerLeaseAlgoHandler), 
schedulerService, quotaManager, log, warmStandbyEnabled, 
multiActiveSchedulerEnabled, schedulerLeaseAlgoHandler);

Review Comment:
   seeing this repeated over and over `(boolean, ScheduledLeaseAlgoHandler)` 
make me wonder... would `Optional<ScheduledLeaseAlgoHandler>` be sufficient?  
e.g. `Optional.absent()` stands in for when currently the `boolean` is `false`?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/LeasedToAnotherStatus.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import lombok.Getter;
+
+/*
+The event in question has been leased to another. This object contains 
`reminderEventTimestamp` which is the event

Review Comment:
   strictly speaking the event wasn't leases, but rather a lease was acquired 
to exclusively handle the event



##########
gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java:
##########
@@ -600,11 +600,17 @@ public static class GobblinJob extends BaseGobblinJob 
implements InterruptableJo
     @Override
     public void executeImpl(JobExecutionContext context)
         throws JobExecutionException {
-      LOG.info("Starting job " + context.getJobDetail().getKey());
-      JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+      JobDetail jobDetail = context.getJobDetail();
+      LOG.info("Starting job " + jobDetail.getKey());
+      JobDataMap dataMap = jobDetail.getJobDataMap();
       JobScheduler jobScheduler = (JobScheduler) 
dataMap.get(JOB_SCHEDULER_KEY);
       Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
       JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
+      // Obtain trigger timestamp from trigger to pass to jobProps
+      Trigger trigger = context.getTrigger();
+      long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
+      
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
+          String.valueOf(triggerTimestampMillis));

Review Comment:
   maybe it's seeing "previous" (fire time) used to populate "new" (event 
timestamp), but I'm unclear, when reading job props--what is indicated by 
"newEventTimestampMillis"?  please explain and/or propose a more 
self-explanatory name... perhaps "triggerTimestampMillis"?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -177,15 +207,46 @@ protected void processMessage(DecodeableKafkaRecord 
message) {
     dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
   }
 
+  protected void submitFlowToDagManager(String flowGroup, String flowName) {
+    // Retrieve job execution plan by recompiling the flow spec to send to the 
DagManager
+    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+    FlowSpec spec = null;
+    try {
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+      spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);
+      //Send the dag to the DagManager.
+      dagManager.addDag(jobExecutionPlanDag, true, true);
+    } catch (URISyntaxException e) {
+      log.warn("Could not create URI object for flowId {} due to error {}", 
flowId, e.getMessage());
+      this.unexpectedErrors.mark();
+      return;
+    } catch (SpecNotFoundException e) {
+      log.warn("Spec not found for flow group: {} name: {} Exception: {}", 
flowGroup, flowName, e);

Review Comment:
   just above we use `flowId`... can we do same here?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -440,12 +446,24 @@ public synchronized void scheduleJob(Properties jobProps, 
JobListener jobListene
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
       Spec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
-      this.orchestrator.orchestrate(flowSpec);
+      String triggerTimestampMillis = extractTriggerTimestampMillis(jobProps);
+      this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis));
     } catch (Exception e) {
       throw new JobException("Failed to run Spec: " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
     }
   }
 
+  /*
+  Helper method used to extract the trigger timestamp from Properties object. 
If key for `original` trigger exists, then
+  we use that because this is a reminder event and the actual event trigger is 
the time we wanted to be reminded of the
+  original trigger.
+   */
+  public static String extractTriggerTimestampMillis(Properties jobProps) {

Review Comment:
   does this need to be `public`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -54,27 +53,26 @@ public void 
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
     String flowName = key.getKey().getFlowName();
     Long flowExecutionId = key.getKey().getFlowExecutionId();
     try {
-      // If an existing resume or kill request is still pending then do not 
accept this request
-      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString())) {
-        DagActionStore.DagActionValue action = 
this.dagActionStore.getDagAction(flowGroup, flowName, 
flowExecutionId.toString()).getDagActionValue();
-        this.handleException(flowGroup, flowName, flowExecutionId.toString(),
-            new RuntimeException("There is already a pending action " + action 
+ " for this flow. Please wait to resubmit and wait for"
+      // If an existing resume request is still pending then do not accept 
this request
+      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
+        this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME,
+            new RuntimeException("There is already a pending RESUME action for 
this flow. Please wait to resubmit and wait for"
                 + " action to be completed."));
         return;
       }
-      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME);
-    } catch (IOException | SQLException | SpecNotFoundException e) {
+      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
+    } catch (IOException | SQLException e) {
       log.warn(
           String.format("Failed to add execution resume action for flow %s %s 
%s to dag action store due to", flowGroup,
               flowName, flowExecutionId), e);
-      this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+      this.handleException(flowGroup, flowName, flowExecutionId.toString(), 
DagActionStore.FlowActionType.RESUME, e);
     }
 
   }
 
-  private void handleException (String flowGroup, String flowName, String 
flowExecutionId, Exception e) {
+  private void handleException (String flowGroup, String flowName, String 
flowExecutionId, DagActionStore.FlowActionType flowActionType, Exception e) {
     try {
-      if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId)) {
+      if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, 
flowActionType)) {

Review Comment:
   having a separate, subsequent, and repeated check of `exists` looks like a 
race condition.  instead whoever calls `handleException` should indicate 
whether or not it previously existed upon the first (and ideally only) call to 
`exists`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
+ * more active instances compete over ownership of a particular flow's event. 
The type of flow event in question does
+ * not impact the algorithm other than to uniquely identify the flow event. 
Each instance uses the interface to initiate
+ * an attempt at ownership over the flow event and receives a response 
indicating the status of the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ *  1. Multiple instances receive knowledge of a flow action event to act upon
+ *  2. Each instance attempts to acquire rights or `a lease` to be the sole 
instance acting on the event by calling the
+ *      tryAcquireLease method below and receives the resulting status. The 
status indicates whether this instance has
+ *        a) acquired the lease -> then this instance will attempt to complete 
the lease
+ *        b) another has acquired the lease -> then another will attempt to 
complete the lease
+ *        c) flow event no longer needs to be acted upon -> terminal state
+ *  3. If another has acquired the lease, then the instance will check back in 
at the time of lease expiry to see if it

Review Comment:
   "if another participant acquired the lease before this one could, then the 
present participant must..."





Issue Time Tracking
-------------------

    Worklog Id:     (was: 864298)
    Time Spent: 8h 10m  (was: 8h)

> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
>                 Key: GOBBLIN-1837
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active 
> scheduler for each host. It will NOT include metric emission or unit tests 
> for validation. That will be done in a separate follow-up ticket. The work in 
> this ticket includes
>  * define a table to do scheduler lease determination for each flow's trigger 
> event and related methods to execute actions on this tableĀ 
>  * update DagActionStore schema and DagActionStoreMonitor to act upon new 
> "LAUNCH" type events in addition to KILL/RESUME
>  * update scheduler/orchestrator logic to apply the non-blocking algorithm 
> when "multi-active scheduler mode" is enabled, otherwise submit events 
> directly to the DagManager after receiving a scheduler trigger
>  * implement the non-blocking algorithm, particularly handling reminder 
> events if another host is in the process of securing the lease for a 
> particular flow trigger



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to