This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new faa3a4f60 [GOBBLIN-1837] Implement multi-active, non blocking for 
leader host (#3700)
faa3a4f60 is described below

commit faa3a4f60b6ef763d620245b1535b75728a9802e
Author: umustafi <[email protected]>
AuthorDate: Thu Jun 15 10:10:07 2023 -0700

    [GOBBLIN-1837] Implement multi-active, non blocking for leader host (#3700)
    
    * basic outline of changes to make, started SchedulerLeaseDeterminationStore
    
    * multiple query options for store
    
    * add launch type as column
    
    * wip for scheduler abstractions
    
    * non blocking algo impl, dag action store updates
    
    * DagActionMonitor changes to handle LAUNCH events
    
    * clean up comments, add docstrings
    
    * redefined lease arbiter & algo handler to separate scheduler specific 
logic from general lease handler
    
    * Address second round of review comments
    
    * Cleanup in response to review, fix failing test
    
    * small clean ups
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../gobblin/configuration/ConfigurationKeys.java   |  15 +
 .../apache/gobblin/service/ServiceConfigKeys.java  |   1 +
 .../src/main/avro/DagActionStoreChangeEvent.avsc   |  18 +
 .../apache/gobblin/runtime/api/DagActionStore.java |  55 ++-
 .../runtime/api/MultiActiveLeaseArbiter.java       | 103 ++++++
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java  | 391 +++++++++++++++++++++
 .../dag_action_store/MysqlDagActionStore.java      |  64 ++--
 .../gobblin/runtime/metrics/RuntimeMetrics.java    |   3 +
 .../gobblin/runtime/util/InjectionNames.java       |   3 +
 .../org/apache/gobblin/scheduler/JobScheduler.java |  15 +-
 .../dag_action_store/MysqlDagActionStoreTest.java  |  43 +--
 .../modules/core/GobblinServiceConfiguration.java  |   4 +
 .../modules/core/GobblinServiceGuiceModule.java    |  12 +
 .../service/modules/orchestration/DagManager.java  |  21 +-
 .../modules/orchestration/FlowTriggerHandler.java  | 179 ++++++++++
 .../modules/orchestration/Orchestrator.java        |  62 +++-
 .../modules/orchestration/TimingEventUtils.java    |   4 +-
 ...lowExecutionResourceHandlerWithWarmStandby.java |  53 ++-
 .../scheduler/GobblinServiceJobScheduler.java      |  20 +-
 .../monitoring/DagActionStoreChangeMonitor.java    |  89 +++--
 .../DagActionStoreChangeMonitorFactory.java        |  17 +-
 .../monitoring/SpecStoreChangeMonitorFactory.java  |   2 +-
 .../modules/orchestration/DagManagerFlowTest.java  |   6 +-
 .../modules/orchestration/OrchestratorTest.java    |  10 +-
 .../scheduler/GobblinServiceJobSchedulerTest.java  |  10 +-
 25 files changed, 1000 insertions(+), 200 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index f0e15bf94..b155e8089 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -95,6 +95,21 @@ public class ConfigurationKeys {
   public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
   public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = 
"skip.scheduling.flows.after.num.days";
   public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+  // Scheduler lease determination store configuration
+  public static final String MYSQL_LEASE_ARBITER_PREFIX = 
"MysqlMultiActiveLeaseArbiter";
+  public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
+  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= MYSQL_LEASE_ARBITER_PREFIX + 
".gobblin_multi_active_scheduler_constants_store";
+  public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiterTable";
+  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
MYSQL_LEASE_ARBITER_PREFIX + ".gobblin_scheduler_lease_determination_store";
+  public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY = 
"eventToRevisitTimestampMillis";
+  public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY = 
"triggerEventTimestampMillis";
+  public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
+  public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000;
+  // Note: linger should be on the order of seconds even though we measure in 
millis
+  public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis";
+  public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
+  public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
+  public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 5000;
 
   // Job executor thread pool size
   public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = 
"jobexecutor.threadpool.size";
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index ef4323538..21b32b58c 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -41,6 +41,7 @@ public class ServiceConfigKeys {
   public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED = 
false;
   public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
   public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
+  public static final String 
GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + 
"multiActiveScheduler.enabled";
   // If true, will mark up/down d2 servers on leadership so that all requests 
will be routed to the leader node
   public static final String GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER = 
GOBBLIN_SERVICE_PREFIX + "d2.onlyAnnounceLeader";
 
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
index 268f18ad0..b628f1714 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
@@ -23,6 +23,24 @@
     "type" : "string",
     "doc" : "flow execution id for the dag action",
     "compliance" : "NONE"
+  }, {
+    "name" : "dagAction",
+    "type": {
+      "type": "enum",
+      "name": "DagActionValue",
+      "symbols": [
+        "KILL",
+        "RESUME",
+        "LAUNCH"
+      ],
+      "symbolDocs": {
+        "KILL": "Kill the flow corresponding to this dag",
+        "RESUME": "Resume or start a new flow corresponding to this dag",
+        "LAUNCH": "Launch a new execution of the flow corresponding to this 
dag"
+      }
+    },
+    "doc" : "type of dag action",
+    "compliance" : "NONE"
   }
   ]
 }
\ No newline at end of file
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
index 5da8e6d31..a1a0ea237 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
@@ -20,29 +20,26 @@ package org.apache.gobblin.runtime.api;
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.Collection;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
+
+import lombok.Data;
 
 
 public interface DagActionStore {
-  enum DagActionValue {
-    KILL,
-    RESUME
+  enum FlowActionType {
+    KILL, // Kill invoked through API call
+    RESUME, // Resume flow invoked through API call
+    LAUNCH, // Launch new flow execution invoked adhoc or through scheduled 
trigger
+    RETRY, // Invoked through DagManager for flows configured to allow retries
+    CANCEL, // Invoked through DagManager if flow has been stuck in 
Orchestrated state for a while
+    ADVANCE // Launch next step in multi-hop dag
   }
 
-  @Getter
-  @EqualsAndHashCode
+  @Data
   class DagAction {
-    String flowGroup;
-    String flowName;
-    String flowExecutionId;
-    DagActionValue dagActionValue;
-    public DagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionValue dagActionValue) {
-      this.flowGroup = flowGroup;
-      this.flowName = flowName;
-      this.flowExecutionId = flowExecutionId;
-      this.dagActionValue = dagActionValue;
-    }
+    final String flowGroup;
+    final String flowName;
+    final String flowExecutionId;
+    final FlowActionType flowActionType;
   }
 
 
@@ -51,40 +48,28 @@ public interface DagActionStore {
    * @param flowGroup flow group for the dag action
    * @param flowName flow name for the dag action
    * @param flowExecutionId flow execution for the dag action
+   * @param flowActionType the value of the dag action
    * @throws IOException
    */
-  boolean exists(String flowGroup, String flowName, String flowExecutionId) 
throws IOException, SQLException;
+  boolean exists(String flowGroup, String flowName, String flowExecutionId, 
FlowActionType flowActionType) throws IOException, SQLException;
 
   /**
    * Persist the dag action in {@link DagActionStore} for durability
    * @param flowGroup flow group for the dag action
    * @param flowName flow name for the dag action
    * @param flowExecutionId flow execution for the dag action
-   * @param dagActionValue the value of the dag action
+   * @param flowActionType the value of the dag action
    * @throws IOException
    */
-  void addDagAction(String flowGroup, String flowName, String flowExecutionId, 
DagActionValue dagActionValue) throws IOException;
+  void addDagAction(String flowGroup, String flowName, String flowExecutionId, 
FlowActionType flowActionType) throws IOException;
 
   /**
    * delete the dag action from {@link DagActionStore}
-   * @param flowGroup flow group for the dag action
-   * @param flowName flow name for the dag action
-   * @param flowExecutionId flow execution for the dag action
+   * @param DagAction containing all information needed to identify dag and 
specific action value
    * @throws IOException
    * @return true if we successfully delete one record, return false if the 
record does not exist
    */
-  boolean deleteDagAction(String flowGroup, String flowName, String 
flowExecutionId) throws IOException;
-
-  /***
-   * Retrieve action value by the flow group, flow name and flow execution id 
from the {@link DagActionStore}.
-   * @param flowGroup flow group for the dag action
-   * @param flowName flow name for the dag action
-   * @param flowExecutionId flow execution for the dag action
-   * @throws IOException Exception in retrieving the {@link DagAction}.
-   * @throws SpecNotFoundException If {@link DagAction} being retrieved is not 
present in store.
-   */
-  DagAction getDagAction(String flowGroup, String flowName, String 
flowExecutionId) throws IOException, SpecNotFoundException,
-                                                                               
            SQLException;
+  boolean deleteDagAction(DagAction dagAction) throws IOException;
 
   /***
    * Get all {@link DagAction}s from the {@link DagActionStore}.
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
new file mode 100644
index 000000000..ab9e03599
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import lombok.Data;
+
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple 
active thread or host system, in which one or
+ * more active participants compete to take responsiblity for a particular 
flow's event. The type of flow event in
+ * question does not impact the algorithm other than to uniquely identify the 
flow event. Each participant uses the
+ * interface to initiate an attempt at ownership over the flow event and 
receives a response indicating the status of
+ * the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ *  1. Multiple participants independently learn of a flow action event to act 
upon
+ *  2. Each participant attempts to acquire rights or `a lease` to be the sole 
participant acting on the event by
+ *     calling the tryAcquireLease method below and receives the resulting 
status. The status indicates whether this
+ *     participant has
+ *        a) LeaseObtainedStatus -> this participant will attempt to carry out 
the required action before the lease expires
+ *        b) LeasedToAnotherStatus -> another will attempt to carry out the 
required action before the lease expires
+ *        c) NoLongerLeasingStatus -> flow event no longer needs to be acted 
upon (terminal state)
+ *  3. If another participant has acquired the lease before this one could, 
then the present participant must check back
+ *    in at the time of lease expiry to see if it needs to attempt the lease 
again [status (b) above].
+ *  4. Once the participant which acquired the lease completes its work on the 
flow event, it calls recordLeaseSuccess
+ *    to indicate to all other participants that the flow event no longer 
needs to be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+  /**
+   * This method attempts to insert an entry into store for a particular flow 
action event if one does not already
+   * exist in the store for the flow action or has expired. Regardless of the 
outcome it also reads the lease
+   * acquisition timestamp of the entry for that flow action event (it could 
have pre-existed in the table or been newly
+   * added by the previous write). Based on the transaction results, it will 
return {@link LeaseAttemptStatus} to
+   * determine the next action.
+   * @param flowAction uniquely identifies the flow and the present action 
upon it
+   * @param eventTimeMillis is the time this flow action was triggered
+   * @return LeaseAttemptStatus
+   * @throws IOException
+   */
+  LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long 
eventTimeMillis) throws IOException;
+
+  /**
+   * This method is used to indicate the owner of the lease has successfully 
completed required actions while holding
+   * the lease of the flow action event. It marks the lease as "no longer 
leasing", if the eventTimeMillis and
+   * leaseAcquisitionTimeMillis values have not changed since this owner 
acquired the lease (indicating the lease did
+   * not expire).
+   * @return true if successfully updated, indicating no further actions need 
to be taken regarding this event.
+   *         false if failed to update the lease properly, the caller should 
continue seeking to acquire the lease as
+   *         if any actions it did successfully accomplish, do not count
+   */
+  boolean recordLeaseSuccess(LeaseObtainedStatus status) throws IOException;
+
+  /*
+   Class used to encapsulate status of lease acquisition attempt and 
derivations should contain information specific to
+   the status that results.
+   */
+  abstract class LeaseAttemptStatus {}
+
+  class NoLongerLeasingStatus extends LeaseAttemptStatus {}
+
+  /*
+  The participant calling this method acquired the lease for the event in 
question. The class contains the
+  `eventTimestamp` associated with the lease as well as the time the caller 
obtained the lease or
+  `leaseAcquisitionTimestamp`.
+  */
+  @Data
+  class LeaseObtainedStatus extends LeaseAttemptStatus {
+    private final DagActionStore.DagAction flowAction;
+    private final long eventTimestamp;
+    private final long leaseAcquisitionTimestamp;
+  }
+
+  /*
+  This flow action event already has a valid lease owned by another 
participant.
+  `eventTimeMillis` is the timestamp the lease is associated with, which may 
be a different timestamp for the same flow
+  action corresponding to the same instance of the event or a distinct one.
+  `minimumLingerDurationMillis` is the minimum amount of time to wait before 
this participant should return to check if
+  the lease has completed or expired
+   */
+  @Data
+  class LeasedToAnotherStatus extends LeaseAttemptStatus {
+    private final DagActionStore.DagAction flowAction;
+    private final long eventTimeMillis;
+    private final long minimumLingerDurationMillis;
+}
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
new file mode 100644
index 000000000..8a40c71b2
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which 
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing participants. A MySQL table is used 
to store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values 
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease, 
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp 
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that 
allow us to coordinate between participants and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to 
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on 
a flow event. It should be much greater
+ *            than epsilon and encapsulate executor communication latency 
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event 
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH, 
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution 
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and 
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most 
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of 
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt 
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark 
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+@Slf4j
+public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+  /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
+  @FunctionalInterface
+  protected interface CheckedFunction<T, R> {
+    R apply(T t) throws IOException, SQLException;
+  }
+
+  protected final DataSource dataSource;
+  private final String leaseArbiterTableName;
+  private final String constantsTableName;
+  private final int epsilon;
+  private final int linger;
+
+  // TODO: define retention on this table
+  private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %S ("
+      + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") 
NOT NULL, flow_name varchar("
+      + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + 
"flow_execution_id varchar("
+      + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, 
flow_action varchar(100) NOT NULL, "
+      + "event_timestamp TIMESTAMP, "
+      + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+      + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
+  private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE 
IF NOT EXISTS %s "
+      + "(epsilon INT, linger INT), PRIMARY KEY (epsilon, linger); INSERT INTO 
%s (epsilon, linger) VALUES (?,?)";
+  protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE 
flow_group=? AND flow_name=? AND flow_execution_id=?"
+      + " AND flow_action=?";
+  protected static final String WHERE_CLAUSE_TO_MATCH_ROW = 
WHERE_CLAUSE_TO_MATCH_KEY
+      + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
+  protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT 
ROW_COUNT() AS rows_inserted_count, "
+      + "lease_acquisition_timestamp, linger FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
+  // Does a cross join between the two tables to have epsilon and linger 
values available. Returns the following values:
+  // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if 
event_timestamp in table is within
+  // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired, 
3 if column is NULL or no longer leasing)
+  protected static final String GET_EVENT_INFO_STATEMENT = "SELECT 
event_timestamp, lease_acquisition_timestamp, "
+      + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+      + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then 
1"
+      + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then 
2"
+      + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
+  // Insert or update row to acquire lease if values have not changed since 
the previous read
+  // Need to define three separate statements to handle cases where row does 
not exist or has null values to check
+  protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
+      + "(flow_group, flow_name, flow_execution_id, flow_action, 
event_timestamp) VALUES (?, ?, ?, ?, ?)";
+  protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+      + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_KEY
+      + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL";
+  protected static final String 
CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
+      + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_ROW
+      + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
+  // Complete lease acquisition if values have not changed since lease was 
acquired
+  protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT = 
"UPDATE %s SET "
+      + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+  @Inject
+  public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
+    if (config.hasPath(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX)) {
+      config = 
config.getConfig(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX).withFallback(config);
+    } else {
+      throw new IOException(String.format("Please specify the config for 
MysqlMultiActiveLeaseArbiter using prefix %s "
+          + "before all properties", 
ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX));
+    }
+
+    this.leaseArbiterTableName = ConfigUtils.getString(config, 
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+        
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
+    this.constantsTableName = ConfigUtils.getString(config, 
ConfigurationKeys.MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY,
+        ConfigurationKeys.DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE);
+    this.epsilon = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
+        ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
+    this.linger = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
+        ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
+    this.dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
+    try (Connection connection = dataSource.getConnection();
+        PreparedStatement createStatement = 
connection.prepareStatement(String.format(
+            CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+      createStatement.executeUpdate();
+      connection.commit();
+    } catch (SQLException e) {
+      throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
+    }
+    withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT, 
this.constantsTableName, this.constantsTableName),
+        createStatement -> {
+      int i = 0;
+      createStatement.setInt(++i, epsilon);
+      createStatement.setInt(++i, linger);
+      return createStatement.executeUpdate();}, true);
+  }
+
+  @Override
+  public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis)
+      throws IOException {
+    // Check table for an existing entry for this flow action and event time
+    ResultSet resultSet = withPreparedStatement(
+        String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName, 
this.constantsTableName),
+        getInfoStatement -> {
+          int i = 0;
+          getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis));
+          getInfoStatement.setString(++i, flowAction.getFlowGroup());
+          getInfoStatement.setString(++i, flowAction.getFlowName());
+          getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
+          getInfoStatement.setString(++i, 
flowAction.getFlowActionType().toString());
+          return getInfoStatement.executeQuery();
+        }, true);
+
+    String formattedSelectAfterInsertStatement =
+        String.format(SELECT_AFTER_INSERT_STATEMENT, 
this.leaseArbiterTableName, this.constantsTableName);
+    try {
+      // CASE 1: If no existing row for this flow action, then go ahead and 
insert
+      if (!resultSet.next()) {
+        String formattedAcquireLeaseNewRowStatement =
+            String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, 
this.leaseArbiterTableName);
+        ResultSet rs = withPreparedStatement(
+            formattedAcquireLeaseNewRowStatement + "; " + 
formattedSelectAfterInsertStatement,
+            insertStatement -> {
+              completeInsertPreparedStatement(insertStatement, flowAction, 
eventTimeMillis);
+              return insertStatement.executeQuery();
+            }, true);
+       return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+      }
+
+      // Extract values from result set
+      Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("lease_acquisition_timestamp");
+      boolean isWithinEpsilon = resultSet.getBoolean("isWithinEpsilon");
+      int leaseValidityStatus = resultSet.getInt("leaseValidityStatus");
+      int dbLinger = resultSet.getInt("linger");
+
+      // CASE 2: If our event timestamp is older than the last event in db, 
then skip this trigger
+      if (eventTimeMillis < dbEventTimestamp.getTime()) {
+        return new NoLongerLeasingStatus();
+      }
+      // Lease is valid
+      if (leaseValidityStatus == 1) {
+        // CASE 3: Same event, lease is valid
+        if (isWithinEpsilon) {
+          // Utilize db timestamp for reminder
+          return new LeasedToAnotherStatus(flowAction, 
dbEventTimestamp.getTime(),
+              dbLeaseAcquisitionTimestamp.getTime() + dbLinger - 
System.currentTimeMillis());
+        }
+        // CASE 4: Distinct event, lease is valid
+        // Utilize db timestamp for wait time, but be reminded of own event 
timestamp
+        return new LeasedToAnotherStatus(flowAction, eventTimeMillis,
+            dbLeaseAcquisitionTimestamp.getTime() + dbLinger  - 
System.currentTimeMillis());
+      }
+      // CASE 5: Lease is out of date (regardless of whether same or distinct 
event)
+      else if (leaseValidityStatus == 2) {
+        if (isWithinEpsilon) {
+          log.warn("Lease should not be out of date for the same trigger event 
since epsilon << linger for flowAction"
+                  + " {}, db eventTimestamp {}, db leaseAcquisitionTimestamp 
{}, linger {}", flowAction,
+              dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
+        }
+        // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
+        String formattedAcquireLeaseIfMatchingAllStatement =
+            
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
this.leaseArbiterTableName);
+        ResultSet rs = withPreparedStatement(
+            formattedAcquireLeaseIfMatchingAllStatement + "; " + 
formattedSelectAfterInsertStatement,
+            updateStatement -> {
+              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+                  true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
+              return updateStatement.executeQuery();
+            }, true);
+        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+      } // No longer leasing this event
+        // CASE 6: Same event, no longer leasing event in db: terminate
+        if (isWithinEpsilon) {
+          return new NoLongerLeasingStatus();
+        }
+        // CASE 7: Distinct event, no longer leasing event in db
+        // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
+        String formattedAcquireLeaseIfFinishedStatement =
+            
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
this.leaseArbiterTableName);
+        ResultSet rs = withPreparedStatement(
+            formattedAcquireLeaseIfFinishedStatement + "; " + 
formattedSelectAfterInsertStatement,
+            updateStatement -> {
+              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+                  false, dbEventTimestamp, null);
+              return updateStatement.executeQuery();
+            }, true);
+        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Attempt lease by insert or update following a read based on the condition 
the state of the table has not changed
+   * since the read. Parse the result to return the corresponding status based 
on successful insert/update or not.
+   * @param resultSet
+   * @param eventTimeMillis
+   * @return LeaseAttemptStatus
+   * @throws SQLException
+   * @throws IOException
+   */
+  protected LeaseAttemptStatus 
handleResultFromAttemptedLeaseObtainment(ResultSet resultSet,
+      DagActionStore.DagAction flowAction, long eventTimeMillis)
+      throws SQLException, IOException {
+    if (!resultSet.next()) {
+      throw new IOException("Expected num rows and lease_acquisition_timestamp 
returned from query but received nothing");
+    }
+    int numRowsUpdated = resultSet.getInt(1);
+    long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
+    int dbLinger = resultSet.getInt(3);
+    if (numRowsUpdated == 1) {
+      return new LeaseObtainedStatus(flowAction, eventTimeMillis, 
leaseAcquisitionTimeMillis);
+    }
+    // Another participant acquired lease in between
+    return new LeasedToAnotherStatus(flowAction, eventTimeMillis,
+        leaseAcquisitionTimeMillis + dbLinger - System.currentTimeMillis());
+  }
+
+  /**
+   * Complete the INSERT statement for a new flow action lease where the flow 
action is not present in the table
+   * @param statement
+   * @param flowAction
+   * @param eventTimeMillis
+   * @throws SQLException
+   */
+  protected void completeInsertPreparedStatement(PreparedStatement statement, 
DagActionStore.DagAction flowAction,
+      long eventTimeMillis) throws SQLException {
+    int i = 0;
+    // Values to set in new row
+    statement.setString(++i, flowAction.getFlowGroup());
+    statement.setString(++i, flowAction.getFlowName());
+    statement.setString(++i, flowAction.getFlowExecutionId());
+    statement.setString(++i, flowAction.getFlowActionType().toString());
+    statement.setTimestamp(++i, new Timestamp(eventTimeMillis));
+    // Values to check if existing row matches previous read
+    statement.setString(++i, flowAction.getFlowGroup());
+    statement.setString(++i, flowAction.getFlowName());
+    statement.setString(++i, flowAction.getFlowExecutionId());
+    statement.setString(++i, flowAction.getFlowActionType().toString());
+    // Values to select for return
+    statement.setString(++i, flowAction.getFlowGroup());
+    statement.setString(++i, flowAction.getFlowName());
+    statement.setString(++i, flowAction.getFlowExecutionId());
+    statement.setString(++i, flowAction.getFlowActionType().toString());
+  }
+
+  /**
+   * Complete the UPDATE prepared statements for a flow action that already 
exists in the table that needs to be
+   * updated.
+   * @param statement
+   * @param flowAction
+   * @param eventTimeMillis
+   * @param needEventTimeCheck true if need to compare 
`originalEventTimestamp` with db event_timestamp
+   * @param needLeaseAcquisitionTimeCheck true if need to compare 
`originalLeaseAcquisitionTimestamp` with db one
+   * @param originalEventTimestamp value to compare to db one, null if not 
needed
+   * @param originalLeaseAcquisitionTimestamp value to compare to db one, null 
if not needed
+   * @throws SQLException
+   */
+  protected void completeUpdatePreparedStatement(PreparedStatement statement, 
DagActionStore.DagAction flowAction,
+      long eventTimeMillis, boolean needEventTimeCheck, boolean 
needLeaseAcquisitionTimeCheck,
+      Timestamp originalEventTimestamp, Timestamp 
originalLeaseAcquisitionTimestamp) throws SQLException {
+    int i = 0;
+    // Value to update
+    statement.setTimestamp(++i, new Timestamp(eventTimeMillis));
+    // Values to check if existing row matches previous read
+    statement.setString(++i, flowAction.getFlowGroup());
+    statement.setString(++i, flowAction.getFlowName());
+    statement.setString(++i, flowAction.getFlowExecutionId());
+    statement.setString(++i, flowAction.getFlowActionType().toString());
+    // Values that may be needed depending on the insert statement
+    if (needEventTimeCheck) {
+      statement.setTimestamp(++i, originalEventTimestamp);
+    }
+    if (needLeaseAcquisitionTimeCheck) {
+      statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp);
+    }
+    // Values to select for return
+    statement.setString(++i, flowAction.getFlowGroup());
+    statement.setString(++i, flowAction.getFlowName());
+    statement.setString(++i, flowAction.getFlowExecutionId());
+    statement.setString(++i, flowAction.getFlowActionType().toString());
+  }
+
+  @Override
+  public boolean recordLeaseSuccess(LeaseObtainedStatus status)
+      throws IOException {
+    DagActionStore.DagAction flowAction = status.getFlowAction();
+    String flowGroup = flowAction.getFlowGroup();
+    String flowName = flowAction.getFlowName();
+    String flowExecutionId = flowAction.getFlowExecutionId();
+    DagActionStore.FlowActionType flowActionType = 
flowAction.getFlowActionType();
+    return 
withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT, 
leaseArbiterTableName),
+        updateStatement -> {
+          int i = 0;
+          updateStatement.setString(++i, flowGroup);
+          updateStatement.setString(++i, flowName);
+          updateStatement.setString(++i, flowExecutionId);
+          updateStatement.setString(++i, flowActionType.toString());
+          updateStatement.setTimestamp(++i, new 
Timestamp(status.getEventTimestamp()));
+          updateStatement.setTimestamp(++i, new 
Timestamp(status.getLeaseAcquisitionTimestamp()));
+          int numRowsUpdated = updateStatement.executeUpdate();
+          if (numRowsUpdated == 0) {
+            log.info("Multi-active lease arbiter lease attempt: [%s, 
eventTimestamp: %s] - FAILED to complete because "
+                + "lease expired or event cleaned up before host completed 
required actions", flowAction,
+                status.getEventTimestamp());
+            return false;
+          }
+          if( numRowsUpdated == 1) {
+            log.info("Multi-active lease arbiter lease attempt: [%s, 
eventTimestamp: %s] - COMPLETED, no longer leasing"
+                    + " this event after this.", flowAction, 
status.getEventTimestamp());
+            return true;
+          };
+          throw new IOException(String.format("Attempt to complete lease use: 
[%s, eventTimestamp: %s] - updated more "
+                  + "rows than expected", flowAction, 
status.getEventTimestamp()));
+        }, true);
+  }
+
+  /** Abstracts recurring pattern around resource management and exception 
re-mapping. */
+  protected <T> T withPreparedStatement(String sql, 
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit) throws 
IOException {
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement statement = connection.prepareStatement(sql)) {
+      T result = f.apply(statement);
+      if (shouldCommit) {
+        connection.commit();
+      }
+      return result;
+    } catch (SQLException e) {
+      log.warn("Received SQL exception that can result from invalid 
connection. Checking if validation query is set {} Exception is {}", 
((HikariDataSource) this.dataSource).getConnectionTestQuery(), e);
+      throw new IOException(e);
+    }
+  }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
index d3f4db11b..ab5faee8c 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
@@ -43,23 +43,21 @@ import org.apache.gobblin.util.ExponentialBackoff;
 public class MysqlDagActionStore implements DagActionStore {
 
   public static final String CONFIG_PREFIX = "MysqlDagActionStore";
-  private static final long GET_DAG_ACTION_INITIAL_WAIT_AFTER_FAILURE = 1000L;
-
 
   protected final DataSource dataSource;
   private final String tableName;
-  private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM 
%s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ?)";
+  private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM 
%s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ? AND 
dag_action = ?)";
 
-  protected static final String INSERT_STATEMENT = "INSERT INTO %s 
(flow_group, flow_name, flow_execution_id, dag_action ) "
+  protected static final String INSERT_STATEMENT = "INSERT INTO %s 
(flow_group, flow_name, flow_execution_id, dag_action) "
       + "VALUES (?, ?, ?, ?)";
-  private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE 
flow_group = ? AND flow_name =? AND flow_execution_id = ?";
-  private static final String GET_STATEMENT = "SELECT flow_group, flow_name, 
flow_execution_id, dag_action FROM %s WHERE flow_group = ? AND flow_name =? AND 
flow_execution_id = ?";
+  private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE 
flow_group = ? AND flow_name =? AND flow_execution_id = ? AND dag_action = ?";
+  private static final String GET_STATEMENT = "SELECT flow_group, flow_name, 
flow_execution_id, dag_action FROM %s WHERE flow_group = ? AND flow_name =? AND 
flow_execution_id = ? AND dag_action = ?";
   private static final String GET_ALL_STATEMENT = "SELECT flow_group, 
flow_name, flow_execution_id, dag_action FROM %s";
   private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT 
EXISTS %s (" +
   "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT 
NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT 
NULL, "
       + "flow_execution_id varchar(" + 
ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
       + "dag_action varchar(100) NOT NULL, modified_time TIMESTAMP DEFAULT 
CURRENT_TIMESTAMP  on update CURRENT_TIMESTAMP NOT NULL, "
-      + "PRIMARY KEY (flow_group,flow_name,flow_execution_id))";
+      + "PRIMARY KEY (flow_group,flow_name,flow_execution_id, dag_action))";
 
   private final int getDagActionMaxRetries;
 
@@ -86,7 +84,7 @@ public class MysqlDagActionStore implements DagActionStore {
   }
 
   @Override
-  public boolean exists(String flowGroup, String flowName, String 
flowExecutionId) throws IOException, SQLException {
+  public boolean exists(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType) throws IOException, 
SQLException {
     ResultSet rs = null;
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement existStatement = 
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
@@ -94,12 +92,13 @@ public class MysqlDagActionStore implements DagActionStore {
       existStatement.setString(++i, flowGroup);
       existStatement.setString(++i, flowName);
       existStatement.setString(++i, flowExecutionId);
+      existStatement.setString(++i, flowActionType.toString());
       rs = existStatement.executeQuery();
       rs.next();
       return rs.getBoolean(1);
     } catch (SQLException e) {
-      throw new IOException(String.format("Failure checking existence for 
table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
-          tableName, flowGroup, flowName, flowExecutionId), e);
+      throw new IOException(String.format("Failure checking existence of 
DagAction: %s in table %s",
+          new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
     } finally {
       if (rs != null) {
         rs.close();
@@ -108,7 +107,7 @@ public class MysqlDagActionStore implements DagActionStore {
   }
 
   @Override
-  public void addDagAction(String flowGroup, String flowName, String 
flowExecutionId, DagActionValue dagActionValue)
+  public void addDagAction(String flowGroup, String flowName, String 
flowExecutionId, FlowActionType flowActionType)
       throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement insertStatement = 
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
@@ -116,33 +115,35 @@ public class MysqlDagActionStore implements 
DagActionStore {
       insertStatement.setString(++i, flowGroup);
       insertStatement.setString(++i, flowName);
       insertStatement.setString(++i, flowExecutionId);
-      insertStatement.setString(++i, dagActionValue.toString());
+      insertStatement.setString(++i, flowActionType.toString());
       insertStatement.executeUpdate();
       connection.commit();
     } catch (SQLException e) {
-      throw new IOException(String.format("Failure to adding action for table 
%s of flow with flow group:%s, flow name:%s and flow execution id:%s",
-          tableName, flowGroup, flowName, flowExecutionId), e);
+      throw new IOException(String.format("Failure adding action for 
DagAction: %s in table %s",
+          new DagAction(flowGroup, flowName, flowExecutionId, flowActionType), 
tableName), e);
     }
   }
 
   @Override
-  public boolean deleteDagAction(String flowGroup, String flowName, String 
flowExecutionId) throws IOException {
+  public boolean deleteDagAction(DagAction dagAction) throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement deleteStatement = 
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
       int i = 0;
-      deleteStatement.setString(++i, flowGroup);
-      deleteStatement.setString(++i, flowName);
-      deleteStatement.setString(++i, flowExecutionId);
+      deleteStatement.setString(++i, dagAction.getFlowGroup());
+      deleteStatement.setString(++i, dagAction.getFlowName());
+      deleteStatement.setString(++i, dagAction.getFlowExecutionId());
+      deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
       int result = deleteStatement.executeUpdate();
       connection.commit();
       return result != 0;
     } catch (SQLException e) {
-      throw new IOException(String.format("Failure to delete action for table 
%s of flow with flow group:%s, flow name:%s and flow execution id:%s",
-          tableName, flowGroup, flowName, flowExecutionId), e);
+      throw new IOException(String.format("Failure deleting action for 
DagAction: %s in table %s", dagAction,
+          tableName), e);
     }
   }
 
-  private DagAction getDagActionWithRetry(String flowGroup, String flowName, 
String flowExecutionId, ExponentialBackoff exponentialBackoff)
+  // TODO: later change this to getDagActions relating to a particular flow 
execution if it makes sense
+  private DagAction getDagActionWithRetry(String flowGroup, String flowName, 
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff 
exponentialBackoff)
       throws IOException, SQLException {
     ResultSet rs = null;
     try (Connection connection = this.dataSource.getConnection();
@@ -151,20 +152,22 @@ public class MysqlDagActionStore implements 
DagActionStore {
       getStatement.setString(++i, flowGroup);
       getStatement.setString(++i, flowName);
       getStatement.setString(++i, flowExecutionId);
+      getStatement.setString(++i, flowActionType.toString());
       rs = getStatement.executeQuery();
       if (rs.next()) {
-        return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), DagActionValue.valueOf(rs.getString(4)));
+        return new DagAction(rs.getString(1), rs.getString(2), 
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
       } else {
         if (exponentialBackoff.awaitNextRetryIfAvailable()) {
-          return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
exponentialBackoff);
+          return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
flowActionType, exponentialBackoff);
         } else {
-          log.warn(String.format("Can not find dag action with flowGroup: %s, 
flowName: %s, flowExecutionId: %s",flowGroup, flowName, flowExecutionId));
+          log.warn(String.format("Can not find dag action: %s with flowGroup: 
%s, flowName: %s, flowExecutionId: %s",
+              flowActionType, flowGroup, flowName, flowExecutionId));
           return null;
         }
       }
     } catch (SQLException | InterruptedException e) {
-      throw new IOException(String.format("Failure get dag action from table 
%s of flow with flow group:%s, flow name:%s and flow execution id:%s",
-          tableName, flowGroup, flowName, flowExecutionId), e);
+      throw new IOException(String.format("Failure get %s from table %s", new 
DagAction(flowGroup, flowName, flowExecutionId,
+          flowActionType), tableName), e);
     } finally {
       if (rs != null) {
         rs.close();
@@ -173,13 +176,6 @@ public class MysqlDagActionStore implements DagActionStore 
{
 
   }
 
-  @Override
-  public DagAction getDagAction(String flowGroup, String flowName, String 
flowExecutionId)
-      throws IOException, SQLException {
-    ExponentialBackoff exponentialBackoff = 
ExponentialBackoff.builder().initialDelay(GET_DAG_ACTION_INITIAL_WAIT_AFTER_FAILURE).maxRetries(this.getDagActionMaxRetries).build();
-    return getDagActionWithRetry(flowGroup, flowName, flowExecutionId, 
exponentialBackoff);
-  }
-
   @Override
   public Collection<DagAction> getDagActions() throws IOException {
     HashSet<DagAction> result = new HashSet<>();
@@ -188,7 +184,7 @@ public class MysqlDagActionStore implements DagActionStore {
         ResultSet rs = getAllStatement.executeQuery()) {
       while (rs.next()) {
         result.add(
-            new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), 
DagActionValue.valueOf(rs.getString(4))));
+            new DagAction(rs.getString(1), rs.getString(2), rs.getString(3), 
FlowActionType.valueOf(rs.getString(4))));
       }
       if (rs != null) {
         rs.close();
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 3d9e9b5c5..dfccb0c07 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -46,6 +46,7 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.kills.invoked";
   public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.message.processed";
   public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED 
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.resumes.invoked";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.flows.launched";
   public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.unexpected.errors";
   public static final String
       GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.produce.to.consume.delay";
@@ -72,6 +73,8 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_GET_SPEC_TIME_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.totalGetSpecTimeNanos";
   public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_ADD_SPEC_TIME_NANOS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.totalAddSpecTimeNanos";
   public static final String 
GOBBLIN_JOB_SCHEDULER_NUM_JOBS_SCHEDULED_DURING_STARTUP = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.numJobsScheduledDuringStartup";
+  // Metrics Used to Track flowTriggerHandlerProgress
+  public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED 
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".flowTriggerHandler.numFlowsSubmitted";
   // Metadata keys
   public static final String TOPIC = "topic";
   public static final String GROUP_ID = "groupId";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
index d0e42f525..b9ff94f8a 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
@@ -25,5 +25,8 @@ public final class InjectionNames {
   public static final String SERVICE_NAME = "serviceName";
   public static final String FORCE_LEADER = "forceLeader";
   public static final String FLOW_CATALOG_LOCAL_COMMIT = 
"flowCatalogLocalCommit";
+
+  // TODO: Rename `warm_standby_enabled` config to 
`message_forwarding_enabled` since it's a misnomer.
   public static final String WARM_STANDBY_ENABLED = "statelessRestAPIEnabled";
+  public static final String MULTI_ACTIVE_SCHEDULER_ENABLED = 
"multiActiveSchedulerEnabled";
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index 1ae73e934..56b1ac8c0 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -396,7 +396,7 @@ public class JobScheduler extends AbstractIdleService {
 
     try {
       // Schedule the Quartz job with a trigger built from the job 
configuration
-      Trigger trigger = getTrigger(job.getKey(), jobProps);
+      Trigger trigger = createTriggerForJob(job.getKey(), jobProps);
       this.scheduler.getScheduler().scheduleJob(job, trigger);
       LOG.info(String.format("Scheduled job %s. Next run: %s.", job.getKey(), 
trigger.getNextFireTime()));
     } catch (SchedulerException se) {
@@ -581,7 +581,7 @@ public class JobScheduler extends AbstractIdleService {
   /**
    * Get a {@link org.quartz.Trigger} from the given job configuration 
properties.
    */
-  private Trigger getTrigger(JobKey jobKey, Properties jobProps) {
+  public static Trigger createTriggerForJob(JobKey jobKey, Properties 
jobProps) {
     // Build a trigger for the job with the given cron-style schedule
     return TriggerBuilder.newTrigger()
         .withIdentity(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY),
@@ -600,11 +600,18 @@ public class JobScheduler extends AbstractIdleService {
     @Override
     public void executeImpl(JobExecutionContext context)
         throws JobExecutionException {
-      LOG.info("Starting job " + context.getJobDetail().getKey());
-      JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+      JobDetail jobDetail = context.getJobDetail();
+      LOG.info("Starting job " + jobDetail.getKey());
+      JobDataMap dataMap = jobDetail.getJobDataMap();
       JobScheduler jobScheduler = (JobScheduler) 
dataMap.get(JOB_SCHEDULER_KEY);
       Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
       JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
+      // Obtain trigger timestamp from trigger to pass to jobProps
+      Trigger trigger = context.getTrigger();
+      // THIS current event has already fired if this method is called, so it 
now exists in <previousFireTime>
+      long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
+      
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+          String.valueOf(triggerTimestampMillis));
 
       try {
         jobScheduler.runJob(jobProps, jobListener);
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
index 0c65f2241..255dd0789 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
@@ -61,40 +61,43 @@ public class MysqlDagActionStoreTest {
 
   @Test
   public void testAddAction() throws Exception {
-    this.mysqlDagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId, DagActionStore.DagActionValue.KILL);
-    //Should not be able to add again when previous one exist
+    this.mysqlDagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId, DagActionStore.FlowActionType.KILL);
+    //Should not be able to add KILL again when previous one exist
     Assert.expectThrows(IOException.class,
-        () -> this.mysqlDagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId, DagActionStore.DagActionValue.RESUME));
-    //Should be able to add un-exist one
-    this.mysqlDagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId_2, DagActionStore.DagActionValue.RESUME);
+        () -> this.mysqlDagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId, DagActionStore.FlowActionType.KILL));
+    //Should be able to add a RESUME action for same execution as well as KILL 
for another execution of the flow
+    this.mysqlDagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId, DagActionStore.FlowActionType.RESUME);
+    this.mysqlDagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId_2, DagActionStore.FlowActionType.KILL);
   }
 
   @Test(dependsOnMethods = "testAddAction")
   public void testExists() throws Exception {
-    Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId));
-    Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId_2));
-    Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId_3));
+    Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId, DagActionStore.FlowActionType.KILL));
+    Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId, DagActionStore.FlowActionType.RESUME));
+    Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId_2, DagActionStore.FlowActionType.KILL));
+    Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId_3, DagActionStore.FlowActionType.RESUME));
+    Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId_3, DagActionStore.FlowActionType.KILL));
   }
 
   @Test(dependsOnMethods = "testExists")
-  public void testGetAction() throws IOException, SQLException {
-    Assert.assertEquals(new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId, DagActionStore.DagActionValue.KILL), 
this.mysqlDagActionStore.getDagAction(flowGroup, flowName, flowExecutionId));
-    Assert.assertEquals(new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId_2, DagActionStore.DagActionValue.RESUME), 
this.mysqlDagActionStore.getDagAction(flowGroup, flowName, flowExecutionId_2));
+  public void testGetActions() throws IOException {
     Collection<DagActionStore.DagAction> dagActions = 
this.mysqlDagActionStore.getDagActions();
-    Assert.assertEquals(2, dagActions.size());
+    Assert.assertEquals(3, dagActions.size());
     HashSet<DagActionStore.DagAction> set = new HashSet<>();
-    set.add(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.DagActionValue.KILL));
-    set.add(new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId_2, DagActionStore.DagActionValue.RESUME));
+    set.add(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.KILL));
+    set.add(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.RESUME));
+    set.add(new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId_2, DagActionStore.FlowActionType.KILL));
     Assert.assertEquals(dagActions, set);
   }
 
-  @Test(dependsOnMethods = "testGetAction")
+  @Test(dependsOnMethods = "testGetActions")
   public void testDeleteAction() throws IOException, SQLException {
-   this.mysqlDagActionStore.deleteDagAction(flowGroup, flowName, 
flowExecutionId);
-   Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 1);
-   Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId));
-   Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId_2));
-   Assert.assertNull( this.mysqlDagActionStore.getDagAction(flowGroup, 
flowName, flowExecutionId));
+   this.mysqlDagActionStore.deleteDagAction(
+       new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.KILL));
+   Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 2);
+   Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId, DagActionStore.FlowActionType.KILL));
+    Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId, DagActionStore.FlowActionType.RESUME));
+   Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName, 
flowExecutionId_2, DagActionStore.FlowActionType.KILL));
   }
 
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
index 0a3640da5..03081b3ba 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
@@ -43,6 +43,9 @@ public class GobblinServiceConfiguration {
   @Getter
   private final boolean isWarmStandbyEnabled;
 
+  @Getter
+  private final boolean isMultiActiveSchedulerEnabled;
+
   @Getter
   private final boolean isTopologyCatalogEnabled;
 
@@ -107,6 +110,7 @@ public class GobblinServiceConfiguration {
     }
 
     this.isWarmStandbyEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
+    this.isMultiActiveSchedulerEnabled = ConfigUtils.getBoolean(config, 
ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY, false);
 
     this.isHelixManagerEnabled = 
config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY);
     this.isDagManagerEnabled =
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 642f78f11..c0f140a9f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -20,7 +20,10 @@ package org.apache.gobblin.service.modules.core;
 import java.util.Objects;
 
 import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
 import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
+import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
 import 
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
@@ -147,6 +150,9 @@ public class GobblinServiceGuiceModule implements Module {
     binder.bindConstant()
         .annotatedWith(Names.named(InjectionNames.WARM_STANDBY_ENABLED))
         .to(serviceConfig.isWarmStandbyEnabled());
+    binder.bindConstant()
+        
.annotatedWith(Names.named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED))
+        .to(serviceConfig.isMultiActiveSchedulerEnabled());
     OptionalBinder.newOptionalBinder(binder, DagActionStore.class);
     if (serviceConfig.isWarmStandbyEnabled()) {
       binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
@@ -159,6 +165,12 @@ public class GobblinServiceGuiceModule implements Module {
       
binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandler.class);
     }
 
+    OptionalBinder.newOptionalBinder(binder, MultiActiveLeaseArbiter.class);
+    OptionalBinder.newOptionalBinder(binder, FlowTriggerHandler.class);
+    if (serviceConfig.isMultiActiveSchedulerEnabled()) {
+      binder.bind(MysqlMultiActiveLeaseArbiter.class);
+      binder.bind(FlowTriggerHandler.class);
+    }
 
     binder.bind(FlowConfigsResource.class);
     binder.bind(FlowConfigsV2Resource.class);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index f47cbde50..80da8a9e9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -277,8 +277,9 @@ public class DagManager extends AbstractIdleService {
    * @param dag {@link Dag} to be added
    * @param persist whether to persist the dag to the {@link DagStateStore}
    * @param setStatus if true, set all jobs in the dag to pending
+   * Note this should only be called from the {@link Orchestrator} or {@link 
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
    */
-  synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, boolean 
setStatus) throws IOException {
+  public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, 
boolean setStatus) throws IOException {
     if (persist) {
       //Persist the dag
       this.dagStateStore.writeCheckpoint(dag);
@@ -425,7 +426,7 @@ public class DagManager extends AbstractIdleService {
         if (dagActionStore.isPresent()) {
           Collection<DagActionStore.DagAction> dagActions = 
dagActionStore.get().getDagActions();
           for (DagActionStore.DagAction action : dagActions) {
-            switch (action.getDagActionValue()) {
+            switch (action.getFlowActionType()) {
               case KILL:
                 this.handleKillFlowEvent(new 
KillFlowEvent(action.getFlowGroup(), action.getFlowName(), 
Long.parseLong(action.getFlowExecutionId())));
                 break;
@@ -433,7 +434,7 @@ public class DagManager extends AbstractIdleService {
                 this.handleResumeFlowEvent(new 
ResumeFlowEvent(action.getFlowGroup(), action.getFlowName(), 
Long.parseLong(action.getFlowExecutionId())));
                 break;
               default:
-                log.warn("Unsupported dagAction: " + 
action.getDagActionValue().toString());
+                log.warn("Unsupported dagAction: " + 
action.getFlowActionType().toString());
             }
           }
         }
@@ -578,9 +579,10 @@ public class DagManager extends AbstractIdleService {
       }
     }
 
-    private void clearUpDagAction(DagId dagId) throws IOException {
+    private void removeDagActionFromStore(DagId dagId, 
DagActionStore.FlowActionType flowActionType) throws IOException {
       if (this.dagActionStore.isPresent()) {
-        this.dagActionStore.get().deleteDagAction(dagId.flowGroup, 
dagId.flowName, dagId.flowExecutionId);
+        this.dagActionStore.get().deleteDagAction(
+            new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName, 
dagId.flowExecutionId, flowActionType));
       }
     }
 
@@ -592,13 +594,13 @@ public class DagManager extends AbstractIdleService {
       String dagId= dagIdToResume.toString();
       if (!this.failedDagIds.contains(dagId)) {
         log.warn("No dag found with dagId " + dagId + ", so cannot resume 
flow");
-        clearUpDagAction(dagIdToResume);
+        removeDagActionFromStore(dagIdToResume, 
DagActionStore.FlowActionType.RESUME);
         return;
       }
       Dag<JobExecutionPlan> dag = this.failedDagStateStore.getDag(dagId);
       if (dag == null) {
         log.error("Dag " + dagId + " was found in memory but not found in 
failed dag state store");
-        clearUpDagAction(dagIdToResume);
+        removeDagActionFromStore(dagIdToResume, 
DagActionStore.FlowActionType.RESUME);
         return;
       }
 
@@ -649,7 +651,7 @@ public class DagManager extends AbstractIdleService {
         if (dagReady) {
           this.dagStateStore.writeCheckpoint(dag.getValue());
           this.failedDagStateStore.cleanUp(dag.getValue());
-          clearUpDagAction(DagManagerUtils.generateDagId(dag.getValue()));
+          
removeDagActionFromStore(DagManagerUtils.generateDagId(dag.getValue()), 
DagActionStore.FlowActionType.RESUME);
           this.failedDagIds.remove(dag.getKey());
           this.resumingDags.remove(dag.getKey());
           initialize(dag.getValue());
@@ -678,7 +680,8 @@ public class DagManager extends AbstractIdleService {
       } else {
         log.warn("Did not find Dag with id {}, it might be already 
cancelled/finished.", dagToCancel);
       }
-      clearUpDagAction(dagId);
+      // Called after a KILL request is received
+      removeDagActionFromStore(dagId, DagActionStore.FlowActionType.KILL);
     }
 
     private void cancelDagNode(DagNode<JobExecutionPlan> dagNodeToCancel) 
throws ExecutionException, InterruptedException {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
new file mode 100644
index 000000000..42ab0af96
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to 
respond to flow action events. It uses the
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a 
given time
+ * for a flow action event. After acquiring the lease, it persists the flow 
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it 
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link MysqlMultiActiveLeaseArbiter.recordLeaseSuccess()} method. Hosts 
that do not gain the lease for the event,
+ * instead schedule a reminder using the {@link SchedulerService} to check 
back in on the previous lease owner's
+ * completion status after the lease should expire to ensure the event is 
handled in failure cases.
+ */
+@Slf4j
+public class FlowTriggerHandler {
+  private final int schedulerMaxBackoffMillis;
+  private static Random random = new Random();
+  protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+  protected SchedulerService schedulerService;
+  protected DagActionStore dagActionStore;
+  private MetricContext metricContext;
+  private ContextAwareMeter numFlowsSubmitted;
+
+  @Inject
+  public FlowTriggerHandler(Config config, MultiActiveLeaseArbiter 
leaseDeterminationStore,
+      SchedulerService schedulerService, DagActionStore dagActionStore) {
+    this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
+        ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
+    this.multiActiveLeaseArbiter = leaseDeterminationStore;
+    this.schedulerService = schedulerService;
+    this.dagActionStore = dagActionStore;
+    this.metricContext = Instrumented.getMetricContext(new 
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+        this.getClass());
+    this.numFlowsSubmitted = 
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED);
+  }
+
+  /**
+   * This method is used in the multi-active scheduler case for one or more 
hosts to respond to a flow action event
+   * by attempting a lease for the flow event and processing the result 
depending on the status of the attempt.
+   * @param jobProps
+   * @param flowAction
+   * @param eventTimeMillis
+   * @throws IOException
+   */
+  public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction 
flowAction, long eventTimeMillis)
+      throws IOException {
+    MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
+        multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
+    // TODO: add a log event or metric for each of these cases
+    if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+      MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = 
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+      if (persistFlowAction(leaseObtainedStatus)) {
+        return;
+      }
+      // If persisting the flow action failed, then we set another trigger for 
this event to occur immediately to
+      // re-attempt handling the event
+      scheduleReminderForEvent(jobProps, new 
MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction,
+          leaseObtainedStatus.getEventTimestamp(), 0L), eventTimeMillis);
+      return;
+    } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+      scheduleReminderForEvent(jobProps, 
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
+          eventTimeMillis);
+      return;
+    } else if (leaseAttemptStatus instanceof  
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+      return;
+    }
+    throw new RuntimeException(String.format("Received type of 
leaseAttemptStatus: %s not handled by this method",
+            leaseAttemptStatus.getClass().getName()));
+  }
+
+  // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
+  private boolean 
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus leaseStatus) {
+    try {
+      DagActionStore.DagAction flowAction = leaseStatus.getFlowAction();
+      this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
+          flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
+      // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+      this.numFlowsSubmitted.mark();
+      return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * This method is used by {@link FlowTriggerHandler.handleTriggerEvent} to 
schedule a self-reminder to check on
+   * the other participant's progress to finish acting on a flow action after 
the time the lease should expire.
+   * @param jobProps
+   * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
+   * @param originalEventTimeMillis the event timestamp we were originally 
handling
+   */
+  private void scheduleReminderForEvent(Properties jobProps, 
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
+      long originalEventTimeMillis) {
+    DagActionStore.DagAction flowAction = status.getFlowAction();
+    // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
+    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+        + random.nextInt(schedulerMaxBackoffMillis));
+    jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
+    // Ensure we save the event timestamp that we're setting reminder for to 
have for debugging purposes
+    // in addition to the event we want to initiate
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(status.getEventTimeMillis()));
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+        String.valueOf(originalEventTimeMillis));
+    JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
+    // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
+    Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps);
+    try {
+      log.info("Flow Trigger Handler - [%s, eventTimestamp: %s] -  attempting 
to schedule reminder for event %s in %s millis",
+          flowAction, originalEventTimeMillis, status.getEventTimeMillis(), 
trigger.getNextFireTime());
+      this.schedulerService.getScheduler().scheduleJob(trigger);
+    } catch (SchedulerException e) {
+      log.warn("Failed to add job reminder due to SchedulerException for job 
%s trigger event %s ", key, status.getEventTimeMillis(), e);
+    }
+    log.info(String.format("Flow Trigger Handler - [%s, eventTimestamp: %s] - 
SCHEDULED REMINDER for event %s in %s millis",
+        flowAction, originalEventTimeMillis, status.getEventTimeMillis(), 
trigger.getNextFireTime()));
+  }
+
+  /**
+   * These methods should only be called from the Orchestrator or JobScheduler 
classes as it directly adds jobs to the
+   * Quartz scheduler
+   * @param delayPeriodMillis
+   * @return
+   */
+  protected static String createCronFromDelayPeriod(long delayPeriodMillis) {
+    LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
+    LocalDateTime timeToScheduleReminder = now.plus(delayPeriodMillis, 
ChronoUnit.MILLIS);
+    // TODO: investigate potentially better way of generating cron expression 
that does not make it US dependent
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("ss mm HH dd MM 
? yyyy", Locale.US);
+    return timeToScheduleReminder.format(formatter);
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index c9a21560c..a0c196789 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -56,6 +56,7 @@ import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -103,7 +104,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   private FlowStatusGenerator flowStatusGenerator;
 
   private UserQuotaManager quotaManager;
-
+  private Optional<FlowTriggerHandler> flowTriggerHandler;
 
   private final ClassAliasResolver<SpecCompiler> aliasResolver;
 
@@ -111,13 +112,13 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
 
   public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
-      FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled) 
{
+      FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled, 
Optional<FlowTriggerHandler> flowTriggerHandler) {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
     this.topologyCatalog = topologyCatalog;
     this.dagManager = dagManager;
     this.flowStatusGenerator = flowStatusGenerator;
-
+    this.flowTriggerHandler = flowTriggerHandler;
     try {
       String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
       if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
@@ -160,8 +161,8 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
   @Inject
   public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, 
Optional<TopologyCatalog> topologyCatalog,
-      Optional<DagManager> dagManager, Optional<Logger> log) {
-    this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true);
+      Optional<DagManager> dagManager, Optional<Logger> log, 
Optional<FlowTriggerHandler> flowTriggerHandler) {
+    this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true, 
flowTriggerHandler);
   }
 
 
@@ -222,7 +223,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
   }
 
-  public void orchestrate(Spec spec) throws Exception {
+  public void orchestrate(Spec spec, Properties jobProps, long 
triggerTimestampMillis) throws Exception {
     // Add below waiting because TopologyCatalog and FlowCatalog service can 
be launched at the same time
     this.topologyCatalog.get().getInitComplete().await();
 
@@ -310,19 +311,28 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         flowCompilationTimer.get().stop(flowMetadata);
       }
 
-      if (this.dagManager.isPresent()) {
-        try {
-          //Send the dag to the DagManager.
-          this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
-        } catch (Exception ex) {
+      // If multi-active scheduler is enabled do not pass onto DagManager, 
otherwise scheduler forwards it directly
+      if (flowTriggerHandler.isPresent()) {
+        // If triggerTimestampMillis is 0, then it was not set by the job 
trigger handler, and we cannot handle this event
+        if (triggerTimestampMillis == 0L) {
+          _log.warn("Skipping execution of spec: {} because missing trigger 
timestamp in job properties",
+              jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+          flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow orchestration 
skipped because no trigger timestamp "
+              + "associated with flow action.");
           if (this.eventSubmitter.isPresent()) {
-            // pronounce failed before stack unwinds, to ensure flow not 
marooned in `COMPILED` state; (failure likely attributable to DB 
connection/failover)
-            String failureMessage = "Failed to add Job Execution Plan due to: 
" + ex.getMessage();
-            flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
             new TimingEvent(this.eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
           }
-          throw ex;
+          return;
         }
+
+        String flowExecutionId = 
flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+        DagActionStore.DagAction flowAction =
+            new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.LAUNCH);
+        flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction, 
triggerTimestampMillis);
+        _log.info("Multi-active scheduler finished handling trigger event: 
[%s, triggerEventTimestamp: %s]", flowAction,
+            triggerTimestampMillis);
+      } else if (this.dagManager.isPresent()) {
+        submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag);
       } else {
         // Schedule all compiled JobSpecs on their respective Executor
         for (Dag.DagNode<JobExecutionPlan> dagNode : 
jobExecutionPlanDag.getNodes()) {
@@ -364,6 +374,28 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  public void submitFlowToDagManager(FlowSpec flowSpec)
+      throws IOException {
+    submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
+  }
+
+  public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan> 
jobExecutionPlanDag)
+      throws IOException {
+    try {
+      //Send the dag to the DagManager.
+      this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+    } catch (Exception ex) {
+      if (this.eventSubmitter.isPresent()) {
+        // pronounce failed before stack unwinds, to ensure flow not marooned 
in `COMPILED` state; (failure likely attributable to DB connection/failover)
+        String failureMessage = "Failed to add Job Execution Plan due to: " + 
ex.getMessage();
+        Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata(flowSpec);
+        flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
+        new TimingEvent(this.eventSubmitter.get(), 
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+      }
+      throw ex;
+    }
+  }
+
   /**
    * Check if a FlowSpec instance is allowed to run.
    *
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 65b464c88..99661305f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -30,8 +30,8 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
 
 
-class TimingEventUtils {
-  static Map<String, String> getFlowMetadata(FlowSpec flowSpec) {
+public class TimingEventUtils {
+  public static Map<String, String> getFlowMetadata(FlowSpec flowSpec) {
     return getFlowMetadata(flowSpec.getConfig());
   }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index 3919a3a7d..0b5d1cdc7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -30,7 +30,6 @@ import java.sql.SQLException;
 import javax.inject.Named;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
@@ -54,34 +53,29 @@ public class 
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
     String flowName = key.getKey().getFlowName();
     Long flowExecutionId = key.getKey().getFlowExecutionId();
     try {
-      // If an existing resume or kill request is still pending then do not 
accept this request
-      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString())) {
-        DagActionStore.DagActionValue action = 
this.dagActionStore.getDagAction(flowGroup, flowName, 
flowExecutionId.toString()).getDagActionValue();
-        this.handleException(flowGroup, flowName, flowExecutionId.toString(),
-            new RuntimeException("There is already a pending action " + action 
+ " for this flow. Please wait to resubmit and wait for"
-                + " action to be completed."));
+      // If an existing resume request is still pending then do not accept 
this request
+      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
+        this.prepareError("There is already a pending RESUME action for this 
flow. Please wait to resubmit and wait "
+            + "for action to be completed.", HttpStatus.S_409_CONFLICT);
         return;
       }
-      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME);
-    } catch (IOException | SQLException | SpecNotFoundException e) {
+      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
+    } catch (IOException | SQLException e) {
       log.warn(
           String.format("Failed to add execution resume action for flow %s %s 
%s to dag action store due to", flowGroup,
               flowName, flowExecutionId), e);
-      this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+      this.prepareError(e.getMessage(), 
HttpStatus.S_500_INTERNAL_SERVER_ERROR);
     }
 
   }
 
-  private void handleException (String flowGroup, String flowName, String 
flowExecutionId, Exception e) {
-    try {
-      if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId)) {
-        throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, 
e.getMessage());
-      } else {
-        throw new 
RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
-      }
-    } catch (IOException | SQLException ex) {
-      throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, 
e.getMessage());
+  private void prepareError(String exceptionMessage, HttpStatus errorType) {
+    if (errorType == HttpStatus.S_409_CONFLICT) {
+      throw new RestLiServiceException(HttpStatus.S_409_CONFLICT, 
exceptionMessage);
+    } else if (errorType == HttpStatus.S_400_BAD_REQUEST) {
+      throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST);
     }
+    throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, 
exceptionMessage);
   }
 
   @Override
@@ -91,21 +85,16 @@ public class 
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
     String flowName = key.getKey().getFlowName();
     Long flowExecutionId = key.getKey().getFlowExecutionId();
     try {
-      // If an existing resume or kill request is still pending then do not 
accept this request
-      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString())) {
-        DagActionStore.DagActionValue action = 
this.dagActionStore.getDagAction(flowGroup, flowName, 
flowExecutionId.toString()).getDagActionValue();
-        this.handleException(flowGroup, flowName, flowExecutionId.toString(),
-            new RuntimeException("There is already a pending " + action + " 
action for this flow. Please wait to resubmit and wait for"
-                + " action to be completed."));
-        return new UpdateResponse(HttpStatus.S_400_BAD_REQUEST);
+      // If an existing kill request is still pending then do not accept this 
request
+      if (this.dagActionStore.exists(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.KILL)) {
+        this.prepareError("There is already a pending KILL action for this 
flow. Please wait to resubmit and wait "
+            + "for action to be completed.", HttpStatus.S_400_BAD_REQUEST);
       }
-      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.DagActionValue.KILL);
+      this.dagActionStore.addDagAction(flowGroup, flowName, 
flowExecutionId.toString(), DagActionStore.FlowActionType.KILL);
       return new UpdateResponse(HttpStatus.S_200_OK);
-    } catch (IOException | SQLException | SpecNotFoundException e) {
-      log.warn(
-          String.format("Failed to add execution delete action for flow %s %s 
%s to dag action store due to", flowGroup,
-              flowName, flowExecutionId), e);
-      this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+    } catch (IOException | SQLException e) {
+      this.prepareError(String.format("Failed to add execution delete action 
for flow %s %s %s to dag action store due to", flowGroup,
+          flowName, flowExecutionId), HttpStatus.S_500_INTERNAL_SERVER_ERROR);
       return new UpdateResponse(HttpStatus.S_500_INTERNAL_SERVER_ERROR);
     }
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 5ee5f9789..b62a869ba 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -79,6 +79,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
@@ -108,6 +109,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   protected final Orchestrator orchestrator;
   protected final Boolean warmStandbyEnabled;
   protected final Optional<UserQuotaManager> quotaManager;
+  protected final Optional<FlowTriggerHandler> flowTriggerHandler;
   @Getter
   protected final Map<String, Spec> scheduledFlowSpecs;
   @Getter
@@ -163,7 +165,8 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       Config config,
       Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, 
Optional<TopologyCatalog> topologyCatalog,
       Orchestrator orchestrator, SchedulerService schedulerService, 
Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
-      @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled) 
throws Exception {
+      @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
+      Optional<FlowTriggerHandler> flowTriggerHandler) throws Exception {
     super(ConfigUtils.configToProperties(config), schedulerService);
 
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -179,6 +182,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
         && config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
     this.warmStandbyEnabled = warmStandbyEnabled;
     this.quotaManager = quotaManager;
+    this.flowTriggerHandler = flowTriggerHandler;
     // Check that these metrics do not exist before adding, mainly for testing 
purpose which creates multiple instances
     // of the scheduler. If one metric exists, then the others should as well.
     MetricFilter filter = 
MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS);
@@ -198,11 +202,13 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   }
 
   public GobblinServiceJobScheduler(String serviceName, Config config, 
FlowStatusGenerator flowStatusGenerator,
-      Optional<HelixManager> helixManager,
-      Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager, Optional<UserQuotaManager> 
quotaManager,
-      SchedulerService schedulerService,  Optional<Logger> log, boolean 
warmStandbyEnabled) throws Exception {
+      Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, 
Optional<TopologyCatalog> topologyCatalog,
+      Optional<DagManager> dagManager, Optional<UserQuotaManager> 
quotaManager, SchedulerService schedulerService,
+      Optional<Logger> log, boolean warmStandbyEnabled, Optional 
<FlowTriggerHandler> flowTriggerHandler)
+      throws Exception {
     this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
-        new Orchestrator(config, flowStatusGenerator, topologyCatalog, 
dagManager, log), schedulerService, quotaManager, log, warmStandbyEnabled);
+        new Orchestrator(config, flowStatusGenerator, topologyCatalog, 
dagManager, log, flowTriggerHandler),
+        schedulerService, quotaManager, log, warmStandbyEnabled, 
flowTriggerHandler);
   }
 
   public synchronized void setActive(boolean isActive) {
@@ -440,7 +446,9 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
       Spec flowSpec = 
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
-      this.orchestrator.orchestrate(flowSpec);
+      String triggerTimestampMillis = jobProps.getProperty(
+          ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY, 
"0");
+      this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis));
     } catch (Exception e) {
       throw new JobException("Failed to run Spec: " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 732697038..456851612 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -18,7 +18,8 @@
 package org.apache.gobblin.service.monitoring;
 
 import java.io.IOException;
-import java.sql.SQLException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -34,10 +35,14 @@ import 
org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 
 
 /**
@@ -52,6 +57,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   // Metrics
   private ContextAwareMeter killsInvoked;
   private ContextAwareMeter resumesInvoked;
+  private ContextAwareMeter flowsLaunched;
   private ContextAwareMeter unexpectedErrors;
   private ContextAwareMeter messageProcessedMeter;
   private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from 
all partitions in one gauge
@@ -71,17 +77,23 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   protected DagActionStore dagActionStore;
 
   protected DagManager dagManager;
+  protected Orchestrator orchestrator;
+  protected boolean isMultiActiveSchedulerEnabled;
+  protected FlowCatalog flowCatalog;
 
   // Note that the topic is an empty string (rather than null to avoid NPE) 
because this monitor relies on the consumer
   // client itself to determine all Kafka related information dynamically 
rather than through the config.
   public DagActionStoreChangeMonitor(String topic, Config config, 
DagActionStore dagActionStore, DagManager dagManager,
-      int numThreads) {
+      int numThreads, FlowCatalog flowCatalog, Orchestrator orchestrator, 
boolean isMultiActiveSchedulerEnabled) {
     // Differentiate group id for each host
     super(topic, config.withValue(GROUP_ID_KEY,
         ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX + 
UUID.randomUUID().toString())),
         numThreads);
     this.dagActionStore = dagActionStore;
     this.dagManager = dagManager;
+    this.flowCatalog = flowCatalog;
+    this.orchestrator = orchestrator;
+    this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
   }
 
   @Override
@@ -93,7 +105,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
 
   @Override
   /*
-  This class is multi-threaded and this message will be called by multiple 
threads, however any given message will be
+  This class is multithreaded and this method will be called by multiple 
threads, however any given message will be
   partitioned and processed by only one thread (and corresponding queue).
    */
   protected void processMessage(DecodeableKafkaRecord message) {
@@ -109,6 +121,8 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
+    DagActionStore.FlowActionType dagActionType = 
DagActionStore.FlowActionType.valueOf(value.getDagAction().toString());
+
     produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
     log.debug("Processing Dag Action message for flow group: {} name: {} 
executionId: {} tid: {} operation: {} lag: {}",
         flowGroup, flowName, flowExecutionId, tid, operation, 
produceToConsumeDelayValue);
@@ -119,47 +133,37 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
       return;
     }
 
-    // Retrieve the Dag Action taken from MySQL table unless operation is 
DELETE
-    DagActionStore.DagActionValue dagAction = null;
-    if (!operation.equals("DELETE")) {
-      try {
-        dagAction = dagActionStore.getDagAction(flowGroup, flowName, 
flowExecutionId).getDagActionValue();
-      } catch (IOException e) {
-        log.error("Encountered IOException trying to retrieve dagAction for 
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, 
flowName, flowExecutionId, e);
-        this.unexpectedErrors.mark();
-        return;
-      } catch (SpecNotFoundException e) {
-        log.error("DagAction not found for flow group: {} name: {} 
executionId: {} Exception: {}", flowGroup, flowName,
-            flowExecutionId, e);
-        this.unexpectedErrors.mark();
-        return;
-      } catch (SQLException throwables) {
-        log.error("Encountered SQLException trying to retrieve dagAction for 
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, 
flowName, flowExecutionId, throwables);
-        return;
-      }
-    }
-
-    // We only expert INSERT and DELETE operations done to this table. INSERTs 
correspond to resume or delete flow
-    // requests that have to be processed. DELETEs require no action.
+    // We only expect INSERT and DELETE operations done to this table. INSERTs 
correspond to any type of
+    // {@link DagActionStore.FlowActionType} flow requests that have to be 
processed. DELETEs require no action.
     try {
       if (operation.equals("INSERT")) {
-        if (dagAction.equals(DagActionStore.DagActionValue.RESUME)) {
+        if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
           log.info("Received insert dag action and about to send resume flow 
request");
           dagManager.handleResumeFlowRequest(flowGroup, 
flowName,Long.parseLong(flowExecutionId));
           this.resumesInvoked.mark();
-        } else if (dagAction.equals(DagActionStore.DagActionValue.KILL)) {
+        } else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
           log.info("Received insert dag action and about to send kill flow 
request");
           dagManager.handleKillFlowRequest(flowGroup, flowName, 
Long.parseLong(flowExecutionId));
           this.killsInvoked.mark();
+        } else if (dagActionType.equals(DagActionStore.FlowActionType.LAUNCH)) 
{
+          // If multi-active scheduler is NOT turned on we should not receive 
these type of events
+          if (!this.isMultiActiveSchedulerEnabled) {
+            this.unexpectedErrors.mark();
+            throw new RuntimeException(String.format("Received LAUNCH 
dagAction while not in multi-active scheduler "
+                + "mode for flowAction: %s",
+                new DagActionStore.DagAction(flowGroup, flowName, 
flowExecutionId, dagActionType)));
+          }
+          log.info("Received insert dag action and about to forward launch 
request to DagManager");
+          submitFlowToDagManagerHelper(flowGroup, flowName);
         } else {
-          log.warn("Received unsupported dagAction {}. Expected to be a KILL 
or RESUME", dagAction);
+          log.warn("Received unsupported dagAction {}. Expected to be a KILL, 
RESUME, or LAUNCH", dagActionType);
           this.unexpectedErrors.mark();
           return;
         }
       } else if (operation.equals("UPDATE")) {
         log.warn("Received an UPDATE action to the DagActionStore when values 
in this store are never supposed to be "
             + "updated. Flow group: {} name {} executionId {} were updated to 
action {}", flowGroup, flowName,
-            flowExecutionId, dagAction);
+            flowExecutionId, dagActionType);
         this.unexpectedErrors.mark();
       } else if (operation.equals("DELETE")) {
         log.debug("Deleted flow group: {} name: {} executionId {} from 
DagActionStore", flowGroup, flowName, flowExecutionId);
@@ -177,15 +181,40 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
   }
 
+  protected void submitFlowToDagManagerHelper(String flowGroup, String 
flowName) {
+    // Retrieve job execution plan by recompiling the flow spec to send to the 
DagManager
+    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+    FlowSpec spec = null;
+    try {
+      URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+      spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      this.orchestrator.submitFlowToDagManager(spec);
+    } catch (URISyntaxException e) {
+      log.warn("Could not create URI object for flowId {} due to error {}", 
flowId, e.getMessage());
+      this.unexpectedErrors.mark();
+      return;
+    } catch (SpecNotFoundException e) {
+      log.warn("Spec not found for flow group: {} name: {} Exception: {}", 
flowGroup, flowName, e);
+      this.unexpectedErrors.mark();
+      return;
+    } catch (IOException e) {
+      log.warn("Failed to add Job Execution Plan for flow group: {} name: {} 
due to error {}", flowGroup, flowName, e);
+      this.unexpectedErrors.mark();
+      return;
+    }
+    // Only mark this if the dag was successfully added
+    this.flowsLaunched.mark();
+  }
+
   @Override
   protected void createMetrics() {
     super.createMetrics();
     this.killsInvoked = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
     this.resumesInvoked = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
+    this.flowsLaunched = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);
     this.unexpectedErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
     this.produceToConsumeDelayMillis = 
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
 () -> produceToConsumeDelayValue);
     this.getMetricContext().register(this.produceToConsumeDelayMillis);
   }
-
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index d4a0656b3..5806949a8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -22,11 +22,15 @@ import java.util.Objects;
 import com.typesafe.config.Config;
 
 import javax.inject.Inject;
+import javax.inject.Named;
 import javax.inject.Provider;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -40,12 +44,20 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
   private final Config config;
   private DagActionStore dagActionStore;
   private DagManager dagManager;
+  private FlowCatalog flowCatalog;
+  private Orchestrator orchestrator;
+  private boolean isMultiActiveSchedulerEnabled;
 
   @Inject
-  public DagActionStoreChangeMonitorFactory(Config config, DagActionStore 
dagActionStore, DagManager dagManager) {
+  public DagActionStoreChangeMonitorFactory(Config config, DagActionStore 
dagActionStore, DagManager dagManager,
+      FlowCatalog flowCatalog, Orchestrator orchestrator,
+      @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean 
isMultiActiveSchedulerEnabled) {
     this.config = Objects.requireNonNull(config);
     this.dagActionStore = dagActionStore;
     this.dagManager = dagManager;
+    this.flowCatalog = flowCatalog;
+    this.orchestrator = orchestrator;
+    this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
   }
 
   private DagActionStoreChangeMonitor createDagActionStoreMonitor()
@@ -56,7 +68,8 @@ public class DagActionStoreChangeMonitorFactory implements 
Provider<DagActionSto
     String topic = ""; // Pass empty string because we expect underlying 
client to dynamically determine the Kafka topic
     int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig, 
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
 
-    return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig, 
this.dagActionStore, this.dagManager, numThreads);
+    return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig, 
this.dagActionStore, this.dagManager,
+        numThreads, flowCatalog, orchestrator, isMultiActiveSchedulerEnabled);
   }
 
   @Override
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
index f63fa6624..4d2771368 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
@@ -42,7 +42,7 @@ public class SpecStoreChangeMonitorFactory implements 
Provider<SpecStoreChangeMo
   private GobblinServiceJobScheduler scheduler;
 
   @Inject
-  public SpecStoreChangeMonitorFactory(Config config,FlowCatalog flowCatalog, 
GobblinServiceJobScheduler scheduler) {
+  public SpecStoreChangeMonitorFactory(Config config, FlowCatalog flowCatalog, 
GobblinServiceJobScheduler scheduler) {
     this.config = Objects.requireNonNull(config);
     this.flowCatalog = flowCatalog;
     this.scheduler = scheduler;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index facdca98c..e8c4fd443 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -84,13 +84,13 @@ public class DagManagerFlowTest {
         .build();
 
     dagActionStore = new MysqlDagActionStore(config);
-    dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.DagActionValue.KILL);
-    dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2, 
DagActionStore.DagActionValue.RESUME);
+    dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, 
DagActionStore.FlowActionType.KILL);
+    dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2, 
DagActionStore.FlowActionType.RESUME);
     dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props), 
false);
     dagManager.dagActionStore = Optional.of(dagActionStore);
     dagManager.setActive(true);
     this.dagNumThreads = dagManager.getNumThreads();
-    Thread.sleep(10000);
+    Thread.sleep(30000);
     // On active, should proceed request and delete action entry
     Assert.assertEquals(dagActionStore.getDagActions().size(), 0);
   }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index b1cec2b3a..fafcc9605 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -79,6 +79,7 @@ public class OrchestratorTest {
   private SpecCatalogListener mockListener;
   private FlowSpec flowSpec;
   private FlowStatusGenerator mockStatusGenerator;
+  private FlowTriggerHandler _mockFlowTriggerHandler;
   private Orchestrator orchestrator;
 
   @BeforeClass
@@ -107,9 +108,10 @@ public class OrchestratorTest {
     this.serviceLauncher.addService(flowCatalog);
     this.mockStatusGenerator = mock(FlowStatusGenerator.class);
 
+    this._mockFlowTriggerHandler = mock(FlowTriggerHandler.class);
     this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
-        this.mockStatusGenerator,
-        Optional.of(this.topologyCatalog), Optional.<DagManager>absent(), 
Optional.of(logger));
+        this.mockStatusGenerator, Optional.of(this.topologyCatalog), 
Optional.<DagManager>absent(), Optional.of(logger),
+         Optional.of(this._mockFlowTriggerHandler));
     this.topologyCatalog.addListener(orchestrator);
     this.flowCatalog.addListener(orchestrator);
     // Start application
@@ -341,13 +343,13 @@ public class OrchestratorTest {
     flowProps.put("gobblin.flow.destinationIdentifier", "destination");
     flowProps.put("flow.allowConcurrentExecution", false);
     FlowSpec adhocSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", 
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), 
Optional.absent());
-    this.orchestrator.orchestrate(adhocSpec);
+    this.orchestrator.orchestrate(adhocSpec, flowProps, 0);
     String metricName = 
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0", 
"flow0", ServiceMetricNames.COMPILED);
     
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
 
     flowProps.setProperty("job.schedule", "0/2 * * * * ?");
     FlowSpec scheduledSpec = new FlowSpec(URI.create("flow0/group0"), "1", "", 
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(), 
Optional.absent());
-    this.orchestrator.orchestrate(scheduledSpec);
+    this.orchestrator.orchestrate(scheduledSpec, flowProps, 0);
     
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName));
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index 22f060bbd..6db4a83e3 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -60,6 +60,7 @@ import org.apache.gobblin.service.modules.flowgraph.Dag;
 import 
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
 import 
org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
@@ -348,7 +349,8 @@ public class GobblinServiceJobSchedulerTest {
     SchedulerService schedulerService = new SchedulerService(new Properties());
     // Mock a GaaS scheduler not in warm standby mode
     GobblinServiceJobScheduler scheduler = new 
GobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService, Optional.of(new 
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false);
+        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService, Optional.of(new 
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false, 
Optional.of(Mockito.mock(
+        FlowTriggerHandler.class)));
 
     schedulerService.startAsync().awaitRunning();
     scheduler.startUp();
@@ -366,7 +368,8 @@ public class GobblinServiceJobSchedulerTest {
 
     //Mock a GaaS scheduler in warm standby mode, where we don't check quota
     GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new 
GobblinServiceJobScheduler("testscheduler",
-        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService, Optional.of(new 
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true);
+        ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), 
null, mockOrchestrator, schedulerService, Optional.of(new 
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true, 
Optional.of(Mockito.mock(
+        FlowTriggerHandler.class)));
 
     schedulerWithWarmStandbyEnabled.startUp();
     schedulerWithWarmStandbyEnabled.setActive(true);
@@ -388,7 +391,8 @@ public class GobblinServiceJobSchedulerTest {
     public TestGobblinServiceJobScheduler(String serviceName, Config config,
         Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> 
topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager> 
quotaManager,
         SchedulerService schedulerService, boolean isWarmStandbyEnabled) 
throws Exception {
-      super(serviceName, config, Optional.absent(), flowCatalog, 
topologyCatalog, orchestrator, schedulerService, quotaManager, 
Optional.absent(), isWarmStandbyEnabled);
+      super(serviceName, config, Optional.absent(), flowCatalog, 
topologyCatalog, orchestrator, schedulerService, quotaManager, 
Optional.absent(), isWarmStandbyEnabled, Optional.of(Mockito.mock(
+          FlowTriggerHandler.class)));
       if (schedulerService != null) {
         hasScheduler = true;
       }

Reply via email to