This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new faa3a4f60 [GOBBLIN-1837] Implement multi-active, non blocking for
leader host (#3700)
faa3a4f60 is described below
commit faa3a4f60b6ef763d620245b1535b75728a9802e
Author: umustafi <[email protected]>
AuthorDate: Thu Jun 15 10:10:07 2023 -0700
[GOBBLIN-1837] Implement multi-active, non blocking for leader host (#3700)
* basic outline of changes to make, started SchedulerLeaseDeterminationStore
* multiple query options for store
* add launch type as column
* wip for scheduler abstractions
* non blocking algo impl, dag action store updates
* DagActionMonitor changes to handle LAUNCH events
* clean up comments, add docstrings
* redefined lease arbiter & algo handler to separate scheduler specific
logic from general lease handler
* Address second round of review comments
* Cleanup in response to review, fix failing test
* small clean ups
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../gobblin/configuration/ConfigurationKeys.java | 15 +
.../apache/gobblin/service/ServiceConfigKeys.java | 1 +
.../src/main/avro/DagActionStoreChangeEvent.avsc | 18 +
.../apache/gobblin/runtime/api/DagActionStore.java | 55 ++-
.../runtime/api/MultiActiveLeaseArbiter.java | 103 ++++++
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 391 +++++++++++++++++++++
.../dag_action_store/MysqlDagActionStore.java | 64 ++--
.../gobblin/runtime/metrics/RuntimeMetrics.java | 3 +
.../gobblin/runtime/util/InjectionNames.java | 3 +
.../org/apache/gobblin/scheduler/JobScheduler.java | 15 +-
.../dag_action_store/MysqlDagActionStoreTest.java | 43 +--
.../modules/core/GobblinServiceConfiguration.java | 4 +
.../modules/core/GobblinServiceGuiceModule.java | 12 +
.../service/modules/orchestration/DagManager.java | 21 +-
.../modules/orchestration/FlowTriggerHandler.java | 179 ++++++++++
.../modules/orchestration/Orchestrator.java | 62 +++-
.../modules/orchestration/TimingEventUtils.java | 4 +-
...lowExecutionResourceHandlerWithWarmStandby.java | 53 ++-
.../scheduler/GobblinServiceJobScheduler.java | 20 +-
.../monitoring/DagActionStoreChangeMonitor.java | 89 +++--
.../DagActionStoreChangeMonitorFactory.java | 17 +-
.../monitoring/SpecStoreChangeMonitorFactory.java | 2 +-
.../modules/orchestration/DagManagerFlowTest.java | 6 +-
.../modules/orchestration/OrchestratorTest.java | 10 +-
.../scheduler/GobblinServiceJobSchedulerTest.java | 10 +-
25 files changed, 1000 insertions(+), 200 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index f0e15bf94..b155e8089 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -95,6 +95,21 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+ // Scheduler lease determination store configuration
+ public static final String MYSQL_LEASE_ARBITER_PREFIX =
"MysqlMultiActiveLeaseArbiter";
+ public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
+ public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= MYSQL_LEASE_ARBITER_PREFIX +
".gobblin_multi_active_scheduler_constants_store";
+ public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiterTable";
+ public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
MYSQL_LEASE_ARBITER_PREFIX + ".gobblin_scheduler_lease_determination_store";
+ public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY =
"eventToRevisitTimestampMillis";
+ public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY =
"triggerEventTimestampMillis";
+ public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
+ public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000;
+ // Note: linger should be on the order of seconds even though we measure in
millis
+ public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis";
+ public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
+ public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
+ public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 5000;
// Job executor thread pool size
public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY =
"jobexecutor.threadpool.size";
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index ef4323538..21b32b58c 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -41,6 +41,7 @@ public class ServiceConfigKeys {
public static final boolean DEFAULT_GOBBLIN_SERVICE_DAG_MANAGER_ENABLED =
false;
public static final String GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "jobStatusMonitor.enabled";
public static final String GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "warmStandby.enabled";
+ public static final String
GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX +
"multiActiveScheduler.enabled";
// If true, will mark up/down d2 servers on leadership so that all requests
will be routed to the leader node
public static final String GOBBLIN_SERVICE_D2_ONLY_ANNOUNCE_LEADER =
GOBBLIN_SERVICE_PREFIX + "d2.onlyAnnounceLeader";
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
index 268f18ad0..b628f1714 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/DagActionStoreChangeEvent.avsc
@@ -23,6 +23,24 @@
"type" : "string",
"doc" : "flow execution id for the dag action",
"compliance" : "NONE"
+ }, {
+ "name" : "dagAction",
+ "type": {
+ "type": "enum",
+ "name": "DagActionValue",
+ "symbols": [
+ "KILL",
+ "RESUME",
+ "LAUNCH"
+ ],
+ "symbolDocs": {
+ "KILL": "Kill the flow corresponding to this dag",
+ "RESUME": "Resume or start a new flow corresponding to this dag",
+ "LAUNCH": "Launch a new execution of the flow corresponding to this
dag"
+ }
+ },
+ "doc" : "type of dag action",
+ "compliance" : "NONE"
}
]
}
\ No newline at end of file
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
index 5da8e6d31..a1a0ea237 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
@@ -20,29 +20,26 @@ package org.apache.gobblin.runtime.api;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Collection;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
+
+import lombok.Data;
public interface DagActionStore {
- enum DagActionValue {
- KILL,
- RESUME
+ enum FlowActionType {
+ KILL, // Kill invoked through API call
+ RESUME, // Resume flow invoked through API call
+ LAUNCH, // Launch new flow execution invoked adhoc or through scheduled
trigger
+ RETRY, // Invoked through DagManager for flows configured to allow retries
+ CANCEL, // Invoked through DagManager if flow has been stuck in
Orchestrated state for a while
+ ADVANCE // Launch next step in multi-hop dag
}
- @Getter
- @EqualsAndHashCode
+ @Data
class DagAction {
- String flowGroup;
- String flowName;
- String flowExecutionId;
- DagActionValue dagActionValue;
- public DagAction(String flowGroup, String flowName, String
flowExecutionId, DagActionValue dagActionValue) {
- this.flowGroup = flowGroup;
- this.flowName = flowName;
- this.flowExecutionId = flowExecutionId;
- this.dagActionValue = dagActionValue;
- }
+ final String flowGroup;
+ final String flowName;
+ final String flowExecutionId;
+ final FlowActionType flowActionType;
}
@@ -51,40 +48,28 @@ public interface DagActionStore {
* @param flowGroup flow group for the dag action
* @param flowName flow name for the dag action
* @param flowExecutionId flow execution for the dag action
+ * @param flowActionType the value of the dag action
* @throws IOException
*/
- boolean exists(String flowGroup, String flowName, String flowExecutionId)
throws IOException, SQLException;
+ boolean exists(String flowGroup, String flowName, String flowExecutionId,
FlowActionType flowActionType) throws IOException, SQLException;
/**
* Persist the dag action in {@link DagActionStore} for durability
* @param flowGroup flow group for the dag action
* @param flowName flow name for the dag action
* @param flowExecutionId flow execution for the dag action
- * @param dagActionValue the value of the dag action
+ * @param flowActionType the value of the dag action
* @throws IOException
*/
- void addDagAction(String flowGroup, String flowName, String flowExecutionId,
DagActionValue dagActionValue) throws IOException;
+ void addDagAction(String flowGroup, String flowName, String flowExecutionId,
FlowActionType flowActionType) throws IOException;
/**
* delete the dag action from {@link DagActionStore}
- * @param flowGroup flow group for the dag action
- * @param flowName flow name for the dag action
- * @param flowExecutionId flow execution for the dag action
+ * @param DagAction containing all information needed to identify dag and
specific action value
* @throws IOException
* @return true if we successfully delete one record, return false if the
record does not exist
*/
- boolean deleteDagAction(String flowGroup, String flowName, String
flowExecutionId) throws IOException;
-
- /***
- * Retrieve action value by the flow group, flow name and flow execution id
from the {@link DagActionStore}.
- * @param flowGroup flow group for the dag action
- * @param flowName flow name for the dag action
- * @param flowExecutionId flow execution for the dag action
- * @throws IOException Exception in retrieving the {@link DagAction}.
- * @throws SpecNotFoundException If {@link DagAction} being retrieved is not
present in store.
- */
- DagAction getDagAction(String flowGroup, String flowName, String
flowExecutionId) throws IOException, SpecNotFoundException,
-
SQLException;
+ boolean deleteDagAction(DagAction dagAction) throws IOException;
/***
* Get all {@link DagAction}s from the {@link DagActionStore}.
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
new file mode 100644
index 000000000..ab9e03599
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+
+import lombok.Data;
+
+
+/**
+ * This interface defines a generic approach to a non-blocking, multiple
active thread or host system, in which one or
+ * more active participants compete to take responsiblity for a particular
flow's event. The type of flow event in
+ * question does not impact the algorithm other than to uniquely identify the
flow event. Each participant uses the
+ * interface to initiate an attempt at ownership over the flow event and
receives a response indicating the status of
+ * the attempt.
+ *
+ * At a high level the lease arbiter works as follows:
+ * 1. Multiple participants independently learn of a flow action event to act
upon
+ * 2. Each participant attempts to acquire rights or `a lease` to be the sole
participant acting on the event by
+ * calling the tryAcquireLease method below and receives the resulting
status. The status indicates whether this
+ * participant has
+ * a) LeaseObtainedStatus -> this participant will attempt to carry out
the required action before the lease expires
+ * b) LeasedToAnotherStatus -> another will attempt to carry out the
required action before the lease expires
+ * c) NoLongerLeasingStatus -> flow event no longer needs to be acted
upon (terminal state)
+ * 3. If another participant has acquired the lease before this one could,
then the present participant must check back
+ * in at the time of lease expiry to see if it needs to attempt the lease
again [status (b) above].
+ * 4. Once the participant which acquired the lease completes its work on the
flow event, it calls recordLeaseSuccess
+ * to indicate to all other participants that the flow event no longer
needs to be acted upon [status (c) above]
+ */
+public interface MultiActiveLeaseArbiter {
+ /**
+ * This method attempts to insert an entry into store for a particular flow
action event if one does not already
+ * exist in the store for the flow action or has expired. Regardless of the
outcome it also reads the lease
+ * acquisition timestamp of the entry for that flow action event (it could
have pre-existed in the table or been newly
+ * added by the previous write). Based on the transaction results, it will
return {@link LeaseAttemptStatus} to
+ * determine the next action.
+ * @param flowAction uniquely identifies the flow and the present action
upon it
+ * @param eventTimeMillis is the time this flow action was triggered
+ * @return LeaseAttemptStatus
+ * @throws IOException
+ */
+ LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long
eventTimeMillis) throws IOException;
+
+ /**
+ * This method is used to indicate the owner of the lease has successfully
completed required actions while holding
+ * the lease of the flow action event. It marks the lease as "no longer
leasing", if the eventTimeMillis and
+ * leaseAcquisitionTimeMillis values have not changed since this owner
acquired the lease (indicating the lease did
+ * not expire).
+ * @return true if successfully updated, indicating no further actions need
to be taken regarding this event.
+ * false if failed to update the lease properly, the caller should
continue seeking to acquire the lease as
+ * if any actions it did successfully accomplish, do not count
+ */
+ boolean recordLeaseSuccess(LeaseObtainedStatus status) throws IOException;
+
+ /*
+ Class used to encapsulate status of lease acquisition attempt and
derivations should contain information specific to
+ the status that results.
+ */
+ abstract class LeaseAttemptStatus {}
+
+ class NoLongerLeasingStatus extends LeaseAttemptStatus {}
+
+ /*
+ The participant calling this method acquired the lease for the event in
question. The class contains the
+ `eventTimestamp` associated with the lease as well as the time the caller
obtained the lease or
+ `leaseAcquisitionTimestamp`.
+ */
+ @Data
+ class LeaseObtainedStatus extends LeaseAttemptStatus {
+ private final DagActionStore.DagAction flowAction;
+ private final long eventTimestamp;
+ private final long leaseAcquisitionTimestamp;
+ }
+
+ /*
+ This flow action event already has a valid lease owned by another
participant.
+ `eventTimeMillis` is the timestamp the lease is associated with, which may
be a different timestamp for the same flow
+ action corresponding to the same instance of the event or a distinct one.
+ `minimumLingerDurationMillis` is the minimum amount of time to wait before
this participant should return to check if
+ the lease has completed or expired
+ */
+ @Data
+ class LeasedToAnotherStatus extends LeaseAttemptStatus {
+ private final DagActionStore.DagAction flowAction;
+ private final long eventTimeMillis;
+ private final long minimumLingerDurationMillis;
+}
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
new file mode 100644
index 000000000..8a40c71b2
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing participants. A MySQL table is used
to store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between participants and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+@Slf4j
+public class MysqlMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ protected final DataSource dataSource;
+ private final String leaseArbiterTableName;
+ private final String constantsTableName;
+ private final int epsilon;
+ private final int linger;
+
+ // TODO: define retention on this table
+ private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %S ("
+ + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")
NOT NULL, flow_name varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "event_timestamp TIMESTAMP, "
+ + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
+ private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE
IF NOT EXISTS %s "
+ + "(epsilon INT, linger INT), PRIMARY KEY (epsilon, linger); INSERT INTO
%s (epsilon, linger) VALUES (?,?)";
+ protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=?";
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW =
WHERE_CLAUSE_TO_MATCH_KEY
+ + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+ // Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
event_timestamp in table is within
+ // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired,
3 if column is NULL or no longer leasing)
+ protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
+ + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then
1"
+ + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then
2"
+ + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+ // Insert or update row to acquire lease if values have not changed since
the previous read
+ // Need to define three separate statements to handle cases where row does
not exist or has null values to check
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
+ + "(flow_group, flow_name, flow_execution_id, flow_action,
event_timestamp) VALUES (?, ?, ?, ?, ?)";
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+ + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_KEY
+ + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL";
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
+ + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_ROW
+ + " AND event_timestamp=? AND lease_acquisition_timestamp=?";
+ // Complete lease acquisition if values have not changed since lease was
acquired
+ protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT =
"UPDATE %s SET "
+ + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ @Inject
+ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
+ if (config.hasPath(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX)) {
+ config =
config.getConfig(ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX).withFallback(config);
+ } else {
+ throw new IOException(String.format("Please specify the config for
MysqlMultiActiveLeaseArbiter using prefix %s "
+ + "before all properties",
ConfigurationKeys.MYSQL_LEASE_ARBITER_PREFIX));
+ }
+
+ this.leaseArbiterTableName = ConfigUtils.getString(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
+ this.constantsTableName = ConfigUtils.getString(config,
ConfigurationKeys.MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY,
+ ConfigurationKeys.DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE);
+ this.epsilon = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
+ this.linger = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
+ this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement createStatement =
connection.prepareStatement(String.format(
+ CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+ createStatement.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
+ }
+ withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT,
this.constantsTableName, this.constantsTableName),
+ createStatement -> {
+ int i = 0;
+ createStatement.setInt(++i, epsilon);
+ createStatement.setInt(++i, linger);
+ return createStatement.executeUpdate();}, true);
+ }
+
+ @Override
+ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis)
+ throws IOException {
+ // Check table for an existing entry for this flow action and event time
+ ResultSet resultSet = withPreparedStatement(
+ String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName),
+ getInfoStatement -> {
+ int i = 0;
+ getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis));
+ getInfoStatement.setString(++i, flowAction.getFlowGroup());
+ getInfoStatement.setString(++i, flowAction.getFlowName());
+ getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
+ getInfoStatement.setString(++i,
flowAction.getFlowActionType().toString());
+ return getInfoStatement.executeQuery();
+ }, true);
+
+ String formattedSelectAfterInsertStatement =
+ String.format(SELECT_AFTER_INSERT_STATEMENT,
this.leaseArbiterTableName, this.constantsTableName);
+ try {
+ // CASE 1: If no existing row for this flow action, then go ahead and
insert
+ if (!resultSet.next()) {
+ String formattedAcquireLeaseNewRowStatement =
+ String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT,
this.leaseArbiterTableName);
+ ResultSet rs = withPreparedStatement(
+ formattedAcquireLeaseNewRowStatement + "; " +
formattedSelectAfterInsertStatement,
+ insertStatement -> {
+ completeInsertPreparedStatement(insertStatement, flowAction,
eventTimeMillis);
+ return insertStatement.executeQuery();
+ }, true);
+ return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ }
+
+ // Extract values from result set
+ Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+ Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("lease_acquisition_timestamp");
+ boolean isWithinEpsilon = resultSet.getBoolean("isWithinEpsilon");
+ int leaseValidityStatus = resultSet.getInt("leaseValidityStatus");
+ int dbLinger = resultSet.getInt("linger");
+
+ // CASE 2: If our event timestamp is older than the last event in db,
then skip this trigger
+ if (eventTimeMillis < dbEventTimestamp.getTime()) {
+ return new NoLongerLeasingStatus();
+ }
+ // Lease is valid
+ if (leaseValidityStatus == 1) {
+ // CASE 3: Same event, lease is valid
+ if (isWithinEpsilon) {
+ // Utilize db timestamp for reminder
+ return new LeasedToAnotherStatus(flowAction,
dbEventTimestamp.getTime(),
+ dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
System.currentTimeMillis());
+ }
+ // CASE 4: Distinct event, lease is valid
+ // Utilize db timestamp for wait time, but be reminded of own event
timestamp
+ return new LeasedToAnotherStatus(flowAction, eventTimeMillis,
+ dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
System.currentTimeMillis());
+ }
+ // CASE 5: Lease is out of date (regardless of whether same or distinct
event)
+ else if (leaseValidityStatus == 2) {
+ if (isWithinEpsilon) {
+ log.warn("Lease should not be out of date for the same trigger event
since epsilon << linger for flowAction"
+ + " {}, db eventTimestamp {}, db leaseAcquisitionTimestamp
{}, linger {}", flowAction,
+ dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
+ }
+ // Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
+ String formattedAcquireLeaseIfMatchingAllStatement =
+
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
this.leaseArbiterTableName);
+ ResultSet rs = withPreparedStatement(
+ formattedAcquireLeaseIfMatchingAllStatement + "; " +
formattedSelectAfterInsertStatement,
+ updateStatement -> {
+ completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
+ return updateStatement.executeQuery();
+ }, true);
+ return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ } // No longer leasing this event
+ // CASE 6: Same event, no longer leasing event in db: terminate
+ if (isWithinEpsilon) {
+ return new NoLongerLeasingStatus();
+ }
+ // CASE 7: Distinct event, no longer leasing event in db
+ // Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
+ String formattedAcquireLeaseIfFinishedStatement =
+
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
this.leaseArbiterTableName);
+ ResultSet rs = withPreparedStatement(
+ formattedAcquireLeaseIfFinishedStatement + "; " +
formattedSelectAfterInsertStatement,
+ updateStatement -> {
+ completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ false, dbEventTimestamp, null);
+ return updateStatement.executeQuery();
+ }, true);
+ return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Attempt lease by insert or update following a read based on the condition
the state of the table has not changed
+ * since the read. Parse the result to return the corresponding status based
on successful insert/update or not.
+ * @param resultSet
+ * @param eventTimeMillis
+ * @return LeaseAttemptStatus
+ * @throws SQLException
+ * @throws IOException
+ */
+ protected LeaseAttemptStatus
handleResultFromAttemptedLeaseObtainment(ResultSet resultSet,
+ DagActionStore.DagAction flowAction, long eventTimeMillis)
+ throws SQLException, IOException {
+ if (!resultSet.next()) {
+ throw new IOException("Expected num rows and lease_acquisition_timestamp
returned from query but received nothing");
+ }
+ int numRowsUpdated = resultSet.getInt(1);
+ long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
+ int dbLinger = resultSet.getInt(3);
+ if (numRowsUpdated == 1) {
+ return new LeaseObtainedStatus(flowAction, eventTimeMillis,
leaseAcquisitionTimeMillis);
+ }
+ // Another participant acquired lease in between
+ return new LeasedToAnotherStatus(flowAction, eventTimeMillis,
+ leaseAcquisitionTimeMillis + dbLinger - System.currentTimeMillis());
+ }
+
+ /**
+ * Complete the INSERT statement for a new flow action lease where the flow
action is not present in the table
+ * @param statement
+ * @param flowAction
+ * @param eventTimeMillis
+ * @throws SQLException
+ */
+ protected void completeInsertPreparedStatement(PreparedStatement statement,
DagActionStore.DagAction flowAction,
+ long eventTimeMillis) throws SQLException {
+ int i = 0;
+ // Values to set in new row
+ statement.setString(++i, flowAction.getFlowGroup());
+ statement.setString(++i, flowAction.getFlowName());
+ statement.setString(++i, flowAction.getFlowExecutionId());
+ statement.setString(++i, flowAction.getFlowActionType().toString());
+ statement.setTimestamp(++i, new Timestamp(eventTimeMillis));
+ // Values to check if existing row matches previous read
+ statement.setString(++i, flowAction.getFlowGroup());
+ statement.setString(++i, flowAction.getFlowName());
+ statement.setString(++i, flowAction.getFlowExecutionId());
+ statement.setString(++i, flowAction.getFlowActionType().toString());
+ // Values to select for return
+ statement.setString(++i, flowAction.getFlowGroup());
+ statement.setString(++i, flowAction.getFlowName());
+ statement.setString(++i, flowAction.getFlowExecutionId());
+ statement.setString(++i, flowAction.getFlowActionType().toString());
+ }
+
+ /**
+ * Complete the UPDATE prepared statements for a flow action that already
exists in the table that needs to be
+ * updated.
+ * @param statement
+ * @param flowAction
+ * @param eventTimeMillis
+ * @param needEventTimeCheck true if need to compare
`originalEventTimestamp` with db event_timestamp
+ * @param needLeaseAcquisitionTimeCheck true if need to compare
`originalLeaseAcquisitionTimestamp` with db one
+ * @param originalEventTimestamp value to compare to db one, null if not
needed
+ * @param originalLeaseAcquisitionTimestamp value to compare to db one, null
if not needed
+ * @throws SQLException
+ */
+ protected void completeUpdatePreparedStatement(PreparedStatement statement,
DagActionStore.DagAction flowAction,
+ long eventTimeMillis, boolean needEventTimeCheck, boolean
needLeaseAcquisitionTimeCheck,
+ Timestamp originalEventTimestamp, Timestamp
originalLeaseAcquisitionTimestamp) throws SQLException {
+ int i = 0;
+ // Value to update
+ statement.setTimestamp(++i, new Timestamp(eventTimeMillis));
+ // Values to check if existing row matches previous read
+ statement.setString(++i, flowAction.getFlowGroup());
+ statement.setString(++i, flowAction.getFlowName());
+ statement.setString(++i, flowAction.getFlowExecutionId());
+ statement.setString(++i, flowAction.getFlowActionType().toString());
+ // Values that may be needed depending on the insert statement
+ if (needEventTimeCheck) {
+ statement.setTimestamp(++i, originalEventTimestamp);
+ }
+ if (needLeaseAcquisitionTimeCheck) {
+ statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp);
+ }
+ // Values to select for return
+ statement.setString(++i, flowAction.getFlowGroup());
+ statement.setString(++i, flowAction.getFlowName());
+ statement.setString(++i, flowAction.getFlowExecutionId());
+ statement.setString(++i, flowAction.getFlowActionType().toString());
+ }
+
+ @Override
+ public boolean recordLeaseSuccess(LeaseObtainedStatus status)
+ throws IOException {
+ DagActionStore.DagAction flowAction = status.getFlowAction();
+ String flowGroup = flowAction.getFlowGroup();
+ String flowName = flowAction.getFlowName();
+ String flowExecutionId = flowAction.getFlowExecutionId();
+ DagActionStore.FlowActionType flowActionType =
flowAction.getFlowActionType();
+ return
withPreparedStatement(String.format(CONDITIONALLY_COMPLETE_LEASE_STATEMENT,
leaseArbiterTableName),
+ updateStatement -> {
+ int i = 0;
+ updateStatement.setString(++i, flowGroup);
+ updateStatement.setString(++i, flowName);
+ updateStatement.setString(++i, flowExecutionId);
+ updateStatement.setString(++i, flowActionType.toString());
+ updateStatement.setTimestamp(++i, new
Timestamp(status.getEventTimestamp()));
+ updateStatement.setTimestamp(++i, new
Timestamp(status.getLeaseAcquisitionTimestamp()));
+ int numRowsUpdated = updateStatement.executeUpdate();
+ if (numRowsUpdated == 0) {
+ log.info("Multi-active lease arbiter lease attempt: [%s,
eventTimestamp: %s] - FAILED to complete because "
+ + "lease expired or event cleaned up before host completed
required actions", flowAction,
+ status.getEventTimestamp());
+ return false;
+ }
+ if( numRowsUpdated == 1) {
+ log.info("Multi-active lease arbiter lease attempt: [%s,
eventTimestamp: %s] - COMPLETED, no longer leasing"
+ + " this event after this.", flowAction,
status.getEventTimestamp());
+ return true;
+ };
+ throw new IOException(String.format("Attempt to complete lease use:
[%s, eventTimestamp: %s] - updated more "
+ + "rows than expected", flowAction,
status.getEventTimestamp()));
+ }, true);
+ }
+
+ /** Abstracts recurring pattern around resource management and exception
re-mapping. */
+ protected <T> T withPreparedStatement(String sql,
CheckedFunction<PreparedStatement, T> f, boolean shouldCommit) throws
IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement = connection.prepareStatement(sql)) {
+ T result = f.apply(statement);
+ if (shouldCommit) {
+ connection.commit();
+ }
+ return result;
+ } catch (SQLException e) {
+ log.warn("Received SQL exception that can result from invalid
connection. Checking if validation query is set {} Exception is {}",
((HikariDataSource) this.dataSource).getConnectionTestQuery(), e);
+ throw new IOException(e);
+ }
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
index d3f4db11b..ab5faee8c 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
@@ -43,23 +43,21 @@ import org.apache.gobblin.util.ExponentialBackoff;
public class MysqlDagActionStore implements DagActionStore {
public static final String CONFIG_PREFIX = "MysqlDagActionStore";
- private static final long GET_DAG_ACTION_INITIAL_WAIT_AFTER_FAILURE = 1000L;
-
protected final DataSource dataSource;
private final String tableName;
- private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM
%s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ?)";
+ private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM
%s WHERE flow_group = ? AND flow_name =? AND flow_execution_id = ? AND
dag_action = ?)";
- protected static final String INSERT_STATEMENT = "INSERT INTO %s
(flow_group, flow_name, flow_execution_id, dag_action ) "
+ protected static final String INSERT_STATEMENT = "INSERT INTO %s
(flow_group, flow_name, flow_execution_id, dag_action) "
+ "VALUES (?, ?, ?, ?)";
- private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE
flow_group = ? AND flow_name =? AND flow_execution_id = ?";
- private static final String GET_STATEMENT = "SELECT flow_group, flow_name,
flow_execution_id, dag_action FROM %s WHERE flow_group = ? AND flow_name =? AND
flow_execution_id = ?";
+ private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE
flow_group = ? AND flow_name =? AND flow_execution_id = ? AND dag_action = ?";
+ private static final String GET_STATEMENT = "SELECT flow_group, flow_name,
flow_execution_id, dag_action FROM %s WHERE flow_group = ? AND flow_name =? AND
flow_execution_id = ? AND dag_action = ?";
private static final String GET_ALL_STATEMENT = "SELECT flow_group,
flow_name, flow_execution_id, dag_action FROM %s";
private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT
EXISTS %s (" +
"flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT
NULL, flow_name varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT
NULL, "
+ "flow_execution_id varchar(" +
ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, "
+ "dag_action varchar(100) NOT NULL, modified_time TIMESTAMP DEFAULT
CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP NOT NULL, "
- + "PRIMARY KEY (flow_group,flow_name,flow_execution_id))";
+ + "PRIMARY KEY (flow_group,flow_name,flow_execution_id, dag_action))";
private final int getDagActionMaxRetries;
@@ -86,7 +84,7 @@ public class MysqlDagActionStore implements DagActionStore {
}
@Override
- public boolean exists(String flowGroup, String flowName, String
flowExecutionId) throws IOException, SQLException {
+ public boolean exists(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType) throws IOException,
SQLException {
ResultSet rs = null;
try (Connection connection = this.dataSource.getConnection();
PreparedStatement existStatement =
connection.prepareStatement(String.format(EXISTS_STATEMENT, tableName))) {
@@ -94,12 +92,13 @@ public class MysqlDagActionStore implements DagActionStore {
existStatement.setString(++i, flowGroup);
existStatement.setString(++i, flowName);
existStatement.setString(++i, flowExecutionId);
+ existStatement.setString(++i, flowActionType.toString());
rs = existStatement.executeQuery();
rs.next();
return rs.getBoolean(1);
} catch (SQLException e) {
- throw new IOException(String.format("Failure checking existence for
table %s of flow with flow group:%s, flow name:%s and flow execution id:%s",
- tableName, flowGroup, flowName, flowExecutionId), e);
+ throw new IOException(String.format("Failure checking existence of
DagAction: %s in table %s",
+ new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
} finally {
if (rs != null) {
rs.close();
@@ -108,7 +107,7 @@ public class MysqlDagActionStore implements DagActionStore {
}
@Override
- public void addDagAction(String flowGroup, String flowName, String
flowExecutionId, DagActionValue dagActionValue)
+ public void addDagAction(String flowGroup, String flowName, String
flowExecutionId, FlowActionType flowActionType)
throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement insertStatement =
connection.prepareStatement(String.format(INSERT_STATEMENT, tableName))) {
@@ -116,33 +115,35 @@ public class MysqlDagActionStore implements
DagActionStore {
insertStatement.setString(++i, flowGroup);
insertStatement.setString(++i, flowName);
insertStatement.setString(++i, flowExecutionId);
- insertStatement.setString(++i, dagActionValue.toString());
+ insertStatement.setString(++i, flowActionType.toString());
insertStatement.executeUpdate();
connection.commit();
} catch (SQLException e) {
- throw new IOException(String.format("Failure to adding action for table
%s of flow with flow group:%s, flow name:%s and flow execution id:%s",
- tableName, flowGroup, flowName, flowExecutionId), e);
+ throw new IOException(String.format("Failure adding action for
DagAction: %s in table %s",
+ new DagAction(flowGroup, flowName, flowExecutionId, flowActionType),
tableName), e);
}
}
@Override
- public boolean deleteDagAction(String flowGroup, String flowName, String
flowExecutionId) throws IOException {
+ public boolean deleteDagAction(DagAction dagAction) throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement deleteStatement =
connection.prepareStatement(String.format(DELETE_STATEMENT, tableName))) {
int i = 0;
- deleteStatement.setString(++i, flowGroup);
- deleteStatement.setString(++i, flowName);
- deleteStatement.setString(++i, flowExecutionId);
+ deleteStatement.setString(++i, dagAction.getFlowGroup());
+ deleteStatement.setString(++i, dagAction.getFlowName());
+ deleteStatement.setString(++i, dagAction.getFlowExecutionId());
+ deleteStatement.setString(++i, dagAction.getFlowActionType().toString());
int result = deleteStatement.executeUpdate();
connection.commit();
return result != 0;
} catch (SQLException e) {
- throw new IOException(String.format("Failure to delete action for table
%s of flow with flow group:%s, flow name:%s and flow execution id:%s",
- tableName, flowGroup, flowName, flowExecutionId), e);
+ throw new IOException(String.format("Failure deleting action for
DagAction: %s in table %s", dagAction,
+ tableName), e);
}
}
- private DagAction getDagActionWithRetry(String flowGroup, String flowName,
String flowExecutionId, ExponentialBackoff exponentialBackoff)
+ // TODO: later change this to getDagActions relating to a particular flow
execution if it makes sense
+ private DagAction getDagActionWithRetry(String flowGroup, String flowName,
String flowExecutionId, FlowActionType flowActionType, ExponentialBackoff
exponentialBackoff)
throws IOException, SQLException {
ResultSet rs = null;
try (Connection connection = this.dataSource.getConnection();
@@ -151,20 +152,22 @@ public class MysqlDagActionStore implements
DagActionStore {
getStatement.setString(++i, flowGroup);
getStatement.setString(++i, flowName);
getStatement.setString(++i, flowExecutionId);
+ getStatement.setString(++i, flowActionType.toString());
rs = getStatement.executeQuery();
if (rs.next()) {
- return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), DagActionValue.valueOf(rs.getString(4)));
+ return new DagAction(rs.getString(1), rs.getString(2),
rs.getString(3), FlowActionType.valueOf(rs.getString(4)));
} else {
if (exponentialBackoff.awaitNextRetryIfAvailable()) {
- return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
exponentialBackoff);
+ return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
flowActionType, exponentialBackoff);
} else {
- log.warn(String.format("Can not find dag action with flowGroup: %s,
flowName: %s, flowExecutionId: %s",flowGroup, flowName, flowExecutionId));
+ log.warn(String.format("Can not find dag action: %s with flowGroup:
%s, flowName: %s, flowExecutionId: %s",
+ flowActionType, flowGroup, flowName, flowExecutionId));
return null;
}
}
} catch (SQLException | InterruptedException e) {
- throw new IOException(String.format("Failure get dag action from table
%s of flow with flow group:%s, flow name:%s and flow execution id:%s",
- tableName, flowGroup, flowName, flowExecutionId), e);
+ throw new IOException(String.format("Failure get %s from table %s", new
DagAction(flowGroup, flowName, flowExecutionId,
+ flowActionType), tableName), e);
} finally {
if (rs != null) {
rs.close();
@@ -173,13 +176,6 @@ public class MysqlDagActionStore implements DagActionStore
{
}
- @Override
- public DagAction getDagAction(String flowGroup, String flowName, String
flowExecutionId)
- throws IOException, SQLException {
- ExponentialBackoff exponentialBackoff =
ExponentialBackoff.builder().initialDelay(GET_DAG_ACTION_INITIAL_WAIT_AFTER_FAILURE).maxRetries(this.getDagActionMaxRetries).build();
- return getDagActionWithRetry(flowGroup, flowName, flowExecutionId,
exponentialBackoff);
- }
-
@Override
public Collection<DagAction> getDagActions() throws IOException {
HashSet<DagAction> result = new HashSet<>();
@@ -188,7 +184,7 @@ public class MysqlDagActionStore implements DagActionStore {
ResultSet rs = getAllStatement.executeQuery()) {
while (rs.next()) {
result.add(
- new DagAction(rs.getString(1), rs.getString(2), rs.getString(3),
DagActionValue.valueOf(rs.getString(4))));
+ new DagAction(rs.getString(1), rs.getString(2), rs.getString(3),
FlowActionType.valueOf(rs.getString(4))));
}
if (rs != null) {
rs.close();
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 3d9e9b5c5..dfccb0c07 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -46,6 +46,7 @@ public class RuntimeMetrics {
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.kills.invoked";
public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED=
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.message.processed";
public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.resumes.invoked";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.flows.launched";
public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.unexpected.errors";
public static final String
GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.produce.to.consume.delay";
@@ -72,6 +73,8 @@ public class RuntimeMetrics {
public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_GET_SPEC_TIME_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.totalGetSpecTimeNanos";
public static final String GOBBLIN_JOB_SCHEDULER_TOTAL_ADD_SPEC_TIME_NANOS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.totalAddSpecTimeNanos";
public static final String
GOBBLIN_JOB_SCHEDULER_NUM_JOBS_SCHEDULED_DURING_STARTUP =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.numJobsScheduledDuringStartup";
+ // Metrics Used to Track flowTriggerHandlerProgress
+ public static final String GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".flowTriggerHandler.numFlowsSubmitted";
// Metadata keys
public static final String TOPIC = "topic";
public static final String GROUP_ID = "groupId";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
index d0e42f525..b9ff94f8a 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/util/InjectionNames.java
@@ -25,5 +25,8 @@ public final class InjectionNames {
public static final String SERVICE_NAME = "serviceName";
public static final String FORCE_LEADER = "forceLeader";
public static final String FLOW_CATALOG_LOCAL_COMMIT =
"flowCatalogLocalCommit";
+
+ // TODO: Rename `warm_standby_enabled` config to
`message_forwarding_enabled` since it's a misnomer.
public static final String WARM_STANDBY_ENABLED = "statelessRestAPIEnabled";
+ public static final String MULTI_ACTIVE_SCHEDULER_ENABLED =
"multiActiveSchedulerEnabled";
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index 1ae73e934..56b1ac8c0 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -396,7 +396,7 @@ public class JobScheduler extends AbstractIdleService {
try {
// Schedule the Quartz job with a trigger built from the job
configuration
- Trigger trigger = getTrigger(job.getKey(), jobProps);
+ Trigger trigger = createTriggerForJob(job.getKey(), jobProps);
this.scheduler.getScheduler().scheduleJob(job, trigger);
LOG.info(String.format("Scheduled job %s. Next run: %s.", job.getKey(),
trigger.getNextFireTime()));
} catch (SchedulerException se) {
@@ -581,7 +581,7 @@ public class JobScheduler extends AbstractIdleService {
/**
* Get a {@link org.quartz.Trigger} from the given job configuration
properties.
*/
- private Trigger getTrigger(JobKey jobKey, Properties jobProps) {
+ public static Trigger createTriggerForJob(JobKey jobKey, Properties
jobProps) {
// Build a trigger for the job with the given cron-style schedule
return TriggerBuilder.newTrigger()
.withIdentity(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY),
@@ -600,11 +600,18 @@ public class JobScheduler extends AbstractIdleService {
@Override
public void executeImpl(JobExecutionContext context)
throws JobExecutionException {
- LOG.info("Starting job " + context.getJobDetail().getKey());
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ JobDetail jobDetail = context.getJobDetail();
+ LOG.info("Starting job " + jobDetail.getKey());
+ JobDataMap dataMap = jobDetail.getJobDataMap();
JobScheduler jobScheduler = (JobScheduler)
dataMap.get(JOB_SCHEDULER_KEY);
Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
+ // Obtain trigger timestamp from trigger to pass to jobProps
+ Trigger trigger = context.getTrigger();
+ // THIS current event has already fired if this method is called, so it
now exists in <previousFireTime>
+ long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(triggerTimestampMillis));
try {
jobScheduler.runJob(jobProps, jobListener);
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
index 0c65f2241..255dd0789 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStoreTest.java
@@ -61,40 +61,43 @@ public class MysqlDagActionStoreTest {
@Test
public void testAddAction() throws Exception {
- this.mysqlDagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId, DagActionStore.DagActionValue.KILL);
- //Should not be able to add again when previous one exist
+ this.mysqlDagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId, DagActionStore.FlowActionType.KILL);
+ //Should not be able to add KILL again when previous one exist
Assert.expectThrows(IOException.class,
- () -> this.mysqlDagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId, DagActionStore.DagActionValue.RESUME));
- //Should be able to add un-exist one
- this.mysqlDagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId_2, DagActionStore.DagActionValue.RESUME);
+ () -> this.mysqlDagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId, DagActionStore.FlowActionType.KILL));
+ //Should be able to add a RESUME action for same execution as well as KILL
for another execution of the flow
+ this.mysqlDagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId, DagActionStore.FlowActionType.RESUME);
+ this.mysqlDagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId_2, DagActionStore.FlowActionType.KILL);
}
@Test(dependsOnMethods = "testAddAction")
public void testExists() throws Exception {
- Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId));
- Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId_2));
- Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId_3));
+ Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId, DagActionStore.FlowActionType.KILL));
+ Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId, DagActionStore.FlowActionType.RESUME));
+ Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId_2, DagActionStore.FlowActionType.KILL));
+ Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId_3, DagActionStore.FlowActionType.RESUME));
+ Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId_3, DagActionStore.FlowActionType.KILL));
}
@Test(dependsOnMethods = "testExists")
- public void testGetAction() throws IOException, SQLException {
- Assert.assertEquals(new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, DagActionStore.DagActionValue.KILL),
this.mysqlDagActionStore.getDagAction(flowGroup, flowName, flowExecutionId));
- Assert.assertEquals(new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId_2, DagActionStore.DagActionValue.RESUME),
this.mysqlDagActionStore.getDagAction(flowGroup, flowName, flowExecutionId_2));
+ public void testGetActions() throws IOException {
Collection<DagActionStore.DagAction> dagActions =
this.mysqlDagActionStore.getDagActions();
- Assert.assertEquals(2, dagActions.size());
+ Assert.assertEquals(3, dagActions.size());
HashSet<DagActionStore.DagAction> set = new HashSet<>();
- set.add(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionValue.KILL));
- set.add(new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId_2, DagActionStore.DagActionValue.RESUME));
+ set.add(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.KILL));
+ set.add(new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.RESUME));
+ set.add(new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId_2, DagActionStore.FlowActionType.KILL));
Assert.assertEquals(dagActions, set);
}
- @Test(dependsOnMethods = "testGetAction")
+ @Test(dependsOnMethods = "testGetActions")
public void testDeleteAction() throws IOException, SQLException {
- this.mysqlDagActionStore.deleteDagAction(flowGroup, flowName,
flowExecutionId);
- Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 1);
- Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId));
- Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId_2));
- Assert.assertNull( this.mysqlDagActionStore.getDagAction(flowGroup,
flowName, flowExecutionId));
+ this.mysqlDagActionStore.deleteDagAction(
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.KILL));
+ Assert.assertEquals(this.mysqlDagActionStore.getDagActions().size(), 2);
+ Assert.assertFalse(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId, DagActionStore.FlowActionType.KILL));
+ Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId, DagActionStore.FlowActionType.RESUME));
+ Assert.assertTrue(this.mysqlDagActionStore.exists(flowGroup, flowName,
flowExecutionId_2, DagActionStore.FlowActionType.KILL));
}
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
index 0a3640da5..03081b3ba 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
@@ -43,6 +43,9 @@ public class GobblinServiceConfiguration {
@Getter
private final boolean isWarmStandbyEnabled;
+ @Getter
+ private final boolean isMultiActiveSchedulerEnabled;
+
@Getter
private final boolean isTopologyCatalogEnabled;
@@ -107,6 +110,7 @@ public class GobblinServiceConfiguration {
}
this.isWarmStandbyEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
+ this.isMultiActiveSchedulerEnabled = ConfigUtils.getBoolean(config,
ServiceConfigKeys.GOBBLIN_SERVICE_MULTI_ACTIVE_SCHEDULER_ENABLED_KEY, false);
this.isHelixManagerEnabled =
config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY);
this.isDagManagerEnabled =
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 642f78f11..c0f140a9f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -20,7 +20,10 @@ package org.apache.gobblin.service.modules.core;
import java.util.Objects;
import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
import org.apache.gobblin.runtime.dag_action_store.MysqlDagActionStore;
+import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby;
import
org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandlerWithWarmStandby;
@@ -147,6 +150,9 @@ public class GobblinServiceGuiceModule implements Module {
binder.bindConstant()
.annotatedWith(Names.named(InjectionNames.WARM_STANDBY_ENABLED))
.to(serviceConfig.isWarmStandbyEnabled());
+ binder.bindConstant()
+
.annotatedWith(Names.named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED))
+ .to(serviceConfig.isMultiActiveSchedulerEnabled());
OptionalBinder.newOptionalBinder(binder, DagActionStore.class);
if (serviceConfig.isWarmStandbyEnabled()) {
binder.bind(DagActionStore.class).to(MysqlDagActionStore.class);
@@ -159,6 +165,12 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandler.class);
}
+ OptionalBinder.newOptionalBinder(binder, MultiActiveLeaseArbiter.class);
+ OptionalBinder.newOptionalBinder(binder, FlowTriggerHandler.class);
+ if (serviceConfig.isMultiActiveSchedulerEnabled()) {
+ binder.bind(MysqlMultiActiveLeaseArbiter.class);
+ binder.bind(FlowTriggerHandler.class);
+ }
binder.bind(FlowConfigsResource.class);
binder.bind(FlowConfigsV2Resource.class);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index f47cbde50..80da8a9e9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -277,8 +277,9 @@ public class DagManager extends AbstractIdleService {
* @param dag {@link Dag} to be added
* @param persist whether to persist the dag to the {@link DagStateStore}
* @param setStatus if true, set all jobs in the dag to pending
+ * Note this should only be called from the {@link Orchestrator} or {@link
org.apache.gobblin.service.monitoring.DagActionStoreChangeMonitor}
*/
- synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, boolean
setStatus) throws IOException {
+ public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist,
boolean setStatus) throws IOException {
if (persist) {
//Persist the dag
this.dagStateStore.writeCheckpoint(dag);
@@ -425,7 +426,7 @@ public class DagManager extends AbstractIdleService {
if (dagActionStore.isPresent()) {
Collection<DagActionStore.DagAction> dagActions =
dagActionStore.get().getDagActions();
for (DagActionStore.DagAction action : dagActions) {
- switch (action.getDagActionValue()) {
+ switch (action.getFlowActionType()) {
case KILL:
this.handleKillFlowEvent(new
KillFlowEvent(action.getFlowGroup(), action.getFlowName(),
Long.parseLong(action.getFlowExecutionId())));
break;
@@ -433,7 +434,7 @@ public class DagManager extends AbstractIdleService {
this.handleResumeFlowEvent(new
ResumeFlowEvent(action.getFlowGroup(), action.getFlowName(),
Long.parseLong(action.getFlowExecutionId())));
break;
default:
- log.warn("Unsupported dagAction: " +
action.getDagActionValue().toString());
+ log.warn("Unsupported dagAction: " +
action.getFlowActionType().toString());
}
}
}
@@ -578,9 +579,10 @@ public class DagManager extends AbstractIdleService {
}
}
- private void clearUpDagAction(DagId dagId) throws IOException {
+ private void removeDagActionFromStore(DagId dagId,
DagActionStore.FlowActionType flowActionType) throws IOException {
if (this.dagActionStore.isPresent()) {
- this.dagActionStore.get().deleteDagAction(dagId.flowGroup,
dagId.flowName, dagId.flowExecutionId);
+ this.dagActionStore.get().deleteDagAction(
+ new DagActionStore.DagAction(dagId.flowGroup, dagId.flowName,
dagId.flowExecutionId, flowActionType));
}
}
@@ -592,13 +594,13 @@ public class DagManager extends AbstractIdleService {
String dagId= dagIdToResume.toString();
if (!this.failedDagIds.contains(dagId)) {
log.warn("No dag found with dagId " + dagId + ", so cannot resume
flow");
- clearUpDagAction(dagIdToResume);
+ removeDagActionFromStore(dagIdToResume,
DagActionStore.FlowActionType.RESUME);
return;
}
Dag<JobExecutionPlan> dag = this.failedDagStateStore.getDag(dagId);
if (dag == null) {
log.error("Dag " + dagId + " was found in memory but not found in
failed dag state store");
- clearUpDagAction(dagIdToResume);
+ removeDagActionFromStore(dagIdToResume,
DagActionStore.FlowActionType.RESUME);
return;
}
@@ -649,7 +651,7 @@ public class DagManager extends AbstractIdleService {
if (dagReady) {
this.dagStateStore.writeCheckpoint(dag.getValue());
this.failedDagStateStore.cleanUp(dag.getValue());
- clearUpDagAction(DagManagerUtils.generateDagId(dag.getValue()));
+
removeDagActionFromStore(DagManagerUtils.generateDagId(dag.getValue()),
DagActionStore.FlowActionType.RESUME);
this.failedDagIds.remove(dag.getKey());
this.resumingDags.remove(dag.getKey());
initialize(dag.getValue());
@@ -678,7 +680,8 @@ public class DagManager extends AbstractIdleService {
} else {
log.warn("Did not find Dag with id {}, it might be already
cancelled/finished.", dagToCancel);
}
- clearUpDagAction(dagId);
+ // Called after a KILL request is received
+ removeDagActionFromStore(dagId, DagActionStore.FlowActionType.KILL);
}
private void cancelDagNode(DagNode<JobExecutionPlan> dagNodeToCancel)
throws ExecutionException, InterruptedException {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
new file mode 100644
index 000000000..42ab0af96
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.api.MysqlMultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
+ * {@link MysqlMultiActiveLeaseArbiter} to determine a single lease owner at a
given time
+ * for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link MysqlMultiActiveLeaseArbiter.recordLeaseSuccess()} method. Hosts
that do not gain the lease for the event,
+ * instead schedule a reminder using the {@link SchedulerService} to check
back in on the previous lease owner's
+ * completion status after the lease should expire to ensure the event is
handled in failure cases.
+ */
+@Slf4j
+public class FlowTriggerHandler {
+ private final int schedulerMaxBackoffMillis;
+ private static Random random = new Random();
+ protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+ protected SchedulerService schedulerService;
+ protected DagActionStore dagActionStore;
+ private MetricContext metricContext;
+ private ContextAwareMeter numFlowsSubmitted;
+
+ @Inject
+ public FlowTriggerHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ SchedulerService schedulerService, DagActionStore dagActionStore) {
+ this.schedulerMaxBackoffMillis = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_MAX_BACKOFF_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS);
+ this.multiActiveLeaseArbiter = leaseDeterminationStore;
+ this.schedulerService = schedulerService;
+ this.dagActionStore = dagActionStore;
+ this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+ this.getClass());
+ this.numFlowsSubmitted =
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_FLOW_TRIGGER_HANDLER_NUM_FLOWS_SUBMITTED);
+ }
+
+ /**
+ * This method is used in the multi-active scheduler case for one or more
hosts to respond to a flow action event
+ * by attempting a lease for the flow event and processing the result
depending on the status of the attempt.
+ * @param jobProps
+ * @param flowAction
+ * @param eventTimeMillis
+ * @throws IOException
+ */
+ public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
+ throws IOException {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
+ multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
+ // TODO: add a log event or metric for each of these cases
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+ if (persistFlowAction(leaseObtainedStatus)) {
+ return;
+ }
+ // If persisting the flow action failed, then we set another trigger for
this event to occur immediately to
+ // re-attempt handling the event
+ scheduleReminderForEvent(jobProps, new
MultiActiveLeaseArbiter.LeasedToAnotherStatus(flowAction,
+ leaseObtainedStatus.getEventTimestamp(), 0L), eventTimeMillis);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ scheduleReminderForEvent(jobProps,
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) leaseAttemptStatus,
+ eventTimeMillis);
+ return;
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ return;
+ }
+ throw new RuntimeException(String.format("Received type of
leaseAttemptStatus: %s not handled by this method",
+ leaseAttemptStatus.getClass().getName()));
+ }
+
+ // Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
+ private boolean
persistFlowAction(MultiActiveLeaseArbiter.LeaseObtainedStatus leaseStatus) {
+ try {
+ DagActionStore.DagAction flowAction = leaseStatus.getFlowAction();
+ this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
+ flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numFlowsSubmitted.mark();
+ return this.multiActiveLeaseArbiter.recordLeaseSuccess(leaseStatus);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * This method is used by {@link FlowTriggerHandler.handleTriggerEvent} to
schedule a self-reminder to check on
+ * the other participant's progress to finish acting on a flow action after
the time the lease should expire.
+ * @param jobProps
+ * @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
+ * @param originalEventTimeMillis the event timestamp we were originally
handling
+ */
+ private void scheduleReminderForEvent(Properties jobProps,
MultiActiveLeaseArbiter.LeasedToAnotherStatus status,
+ long originalEventTimeMillis) {
+ DagActionStore.DagAction flowAction = status.getFlowAction();
+ // Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
+ String cronExpression =
createCronFromDelayPeriod(status.getMinimumLingerDurationMillis()
+ + random.nextInt(schedulerMaxBackoffMillis));
+ jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
+ // Ensure we save the event timestamp that we're setting reminder for to
have for debugging purposes
+ // in addition to the event we want to initiate
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(status.getEventTimeMillis()));
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(originalEventTimeMillis));
+ JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
+ // Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
+ Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps);
+ try {
+ log.info("Flow Trigger Handler - [%s, eventTimestamp: %s] - attempting
to schedule reminder for event %s in %s millis",
+ flowAction, originalEventTimeMillis, status.getEventTimeMillis(),
trigger.getNextFireTime());
+ this.schedulerService.getScheduler().scheduleJob(trigger);
+ } catch (SchedulerException e) {
+ log.warn("Failed to add job reminder due to SchedulerException for job
%s trigger event %s ", key, status.getEventTimeMillis(), e);
+ }
+ log.info(String.format("Flow Trigger Handler - [%s, eventTimestamp: %s] -
SCHEDULED REMINDER for event %s in %s millis",
+ flowAction, originalEventTimeMillis, status.getEventTimeMillis(),
trigger.getNextFireTime()));
+ }
+
+ /**
+ * These methods should only be called from the Orchestrator or JobScheduler
classes as it directly adds jobs to the
+ * Quartz scheduler
+ * @param delayPeriodMillis
+ * @return
+ */
+ protected static String createCronFromDelayPeriod(long delayPeriodMillis) {
+ LocalDateTime now = LocalDateTime.now(ZoneId.of("UTC"));
+ LocalDateTime timeToScheduleReminder = now.plus(delayPeriodMillis,
ChronoUnit.MILLIS);
+ // TODO: investigate potentially better way of generating cron expression
that does not make it US dependent
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("ss mm HH dd MM
? yyyy", Locale.US);
+ return timeToScheduleReminder.format(formatter);
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index c9a21560c..a0c196789 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -56,6 +56,7 @@ import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -103,7 +104,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
private FlowStatusGenerator flowStatusGenerator;
private UserQuotaManager quotaManager;
-
+ private Optional<FlowTriggerHandler> flowTriggerHandler;
private final ClassAliasResolver<SpecCompiler> aliasResolver;
@@ -111,13 +112,13 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
public Orchestrator(Config config, Optional<TopologyCatalog>
topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
- FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled)
{
+ FlowStatusGenerator flowStatusGenerator, boolean instrumentationEnabled,
Optional<FlowTriggerHandler> flowTriggerHandler) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
this.flowStatusGenerator = flowStatusGenerator;
-
+ this.flowTriggerHandler = flowTriggerHandler;
try {
String specCompilerClassName =
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
if
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
@@ -160,8 +161,8 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
@Inject
public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator,
Optional<TopologyCatalog> topologyCatalog,
- Optional<DagManager> dagManager, Optional<Logger> log) {
- this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true);
+ Optional<DagManager> dagManager, Optional<Logger> log,
Optional<FlowTriggerHandler> flowTriggerHandler) {
+ this(config, topologyCatalog, dagManager, log, flowStatusGenerator, true,
flowTriggerHandler);
}
@@ -222,7 +223,7 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
}
- public void orchestrate(Spec spec) throws Exception {
+ public void orchestrate(Spec spec, Properties jobProps, long
triggerTimestampMillis) throws Exception {
// Add below waiting because TopologyCatalog and FlowCatalog service can
be launched at the same time
this.topologyCatalog.get().getInitComplete().await();
@@ -310,19 +311,28 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
flowCompilationTimer.get().stop(flowMetadata);
}
- if (this.dagManager.isPresent()) {
- try {
- //Send the dag to the DagManager.
- this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
- } catch (Exception ex) {
+ // If multi-active scheduler is enabled do not pass onto DagManager,
otherwise scheduler forwards it directly
+ if (flowTriggerHandler.isPresent()) {
+ // If triggerTimestampMillis is 0, then it was not set by the job
trigger handler, and we cannot handle this event
+ if (triggerTimestampMillis == 0L) {
+ _log.warn("Skipping execution of spec: {} because missing trigger
timestamp in job properties",
+ jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, "Flow orchestration
skipped because no trigger timestamp "
+ + "associated with flow action.");
if (this.eventSubmitter.isPresent()) {
- // pronounce failed before stack unwinds, to ensure flow not
marooned in `COMPILED` state; (failure likely attributable to DB
connection/failover)
- String failureMessage = "Failed to add Job Execution Plan due to:
" + ex.getMessage();
- flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
new TimingEvent(this.eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
}
- throw ex;
+ return;
}
+
+ String flowExecutionId =
flowMetadata.get(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD);
+ DagActionStore.DagAction flowAction =
+ new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.LAUNCH);
+ flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction,
triggerTimestampMillis);
+ _log.info("Multi-active scheduler finished handling trigger event:
[%s, triggerEventTimestamp: %s]", flowAction,
+ triggerTimestampMillis);
+ } else if (this.dagManager.isPresent()) {
+ submitFlowToDagManager((FlowSpec) spec, jobExecutionPlanDag);
} else {
// Schedule all compiled JobSpecs on their respective Executor
for (Dag.DagNode<JobExecutionPlan> dagNode :
jobExecutionPlanDag.getNodes()) {
@@ -364,6 +374,28 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() -
startTime, TimeUnit.NANOSECONDS);
}
+ public void submitFlowToDagManager(FlowSpec flowSpec)
+ throws IOException {
+ submitFlowToDagManager(flowSpec, specCompiler.compileFlow(flowSpec));
+ }
+
+ public void submitFlowToDagManager(FlowSpec flowSpec, Dag<JobExecutionPlan>
jobExecutionPlanDag)
+ throws IOException {
+ try {
+ //Send the dag to the DagManager.
+ this.dagManager.get().addDag(jobExecutionPlanDag, true, true);
+ } catch (Exception ex) {
+ if (this.eventSubmitter.isPresent()) {
+ // pronounce failed before stack unwinds, to ensure flow not marooned
in `COMPILED` state; (failure likely attributable to DB connection/failover)
+ String failureMessage = "Failed to add Job Execution Plan due to: " +
ex.getMessage();
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(flowSpec);
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, failureMessage);
+ new TimingEvent(this.eventSubmitter.get(),
TimingEvent.FlowTimings.FLOW_FAILED).stop(flowMetadata);
+ }
+ throw ex;
+ }
+ }
+
/**
* Check if a FlowSpec instance is allowed to run.
*
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index 65b464c88..99661305f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -30,8 +30,8 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
-class TimingEventUtils {
- static Map<String, String> getFlowMetadata(FlowSpec flowSpec) {
+public class TimingEventUtils {
+ public static Map<String, String> getFlowMetadata(FlowSpec flowSpec) {
return getFlowMetadata(flowSpec.getConfig());
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
index 3919a3a7d..0b5d1cdc7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java
@@ -30,7 +30,6 @@ import java.sql.SQLException;
import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.api.DagActionStore;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
import org.apache.gobblin.service.modules.core.GobblinServiceManager;
@@ -54,34 +53,29 @@ public class
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
String flowName = key.getKey().getFlowName();
Long flowExecutionId = key.getKey().getFlowExecutionId();
try {
- // If an existing resume or kill request is still pending then do not
accept this request
- if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString())) {
- DagActionStore.DagActionValue action =
this.dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId.toString()).getDagActionValue();
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
- new RuntimeException("There is already a pending action " + action
+ " for this flow. Please wait to resubmit and wait for"
- + " action to be completed."));
+ // If an existing resume request is still pending then do not accept
this request
+ if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME)) {
+ this.prepareError("There is already a pending RESUME action for this
flow. Please wait to resubmit and wait "
+ + "for action to be completed.", HttpStatus.S_409_CONFLICT);
return;
}
- this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME);
- } catch (IOException | SQLException | SpecNotFoundException e) {
+ this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.RESUME);
+ } catch (IOException | SQLException e) {
log.warn(
String.format("Failed to add execution resume action for flow %s %s
%s to dag action store due to", flowGroup,
flowName, flowExecutionId), e);
- this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+ this.prepareError(e.getMessage(),
HttpStatus.S_500_INTERNAL_SERVER_ERROR);
}
}
- private void handleException (String flowGroup, String flowName, String
flowExecutionId, Exception e) {
- try {
- if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId)) {
- throw new RestLiServiceException(HttpStatus.S_409_CONFLICT,
e.getMessage());
- } else {
- throw new
RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
- }
- } catch (IOException | SQLException ex) {
- throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR,
e.getMessage());
+ private void prepareError(String exceptionMessage, HttpStatus errorType) {
+ if (errorType == HttpStatus.S_409_CONFLICT) {
+ throw new RestLiServiceException(HttpStatus.S_409_CONFLICT,
exceptionMessage);
+ } else if (errorType == HttpStatus.S_400_BAD_REQUEST) {
+ throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST);
}
+ throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR,
exceptionMessage);
}
@Override
@@ -91,21 +85,16 @@ public class
GobblinServiceFlowExecutionResourceHandlerWithWarmStandby extends G
String flowName = key.getKey().getFlowName();
Long flowExecutionId = key.getKey().getFlowExecutionId();
try {
- // If an existing resume or kill request is still pending then do not
accept this request
- if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString())) {
- DagActionStore.DagActionValue action =
this.dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId.toString()).getDagActionValue();
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
- new RuntimeException("There is already a pending " + action + "
action for this flow. Please wait to resubmit and wait for"
- + " action to be completed."));
- return new UpdateResponse(HttpStatus.S_400_BAD_REQUEST);
+ // If an existing kill request is still pending then do not accept this
request
+ if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.KILL)) {
+ this.prepareError("There is already a pending KILL action for this
flow. Please wait to resubmit and wait "
+ + "for action to be completed.", HttpStatus.S_400_BAD_REQUEST);
}
- this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.DagActionValue.KILL);
+ this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.FlowActionType.KILL);
return new UpdateResponse(HttpStatus.S_200_OK);
- } catch (IOException | SQLException | SpecNotFoundException e) {
- log.warn(
- String.format("Failed to add execution delete action for flow %s %s
%s to dag action store due to", flowGroup,
- flowName, flowExecutionId), e);
- this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+ } catch (IOException | SQLException e) {
+ this.prepareError(String.format("Failed to add execution delete action
for flow %s %s %s to dag action store due to", flowGroup,
+ flowName, flowExecutionId), HttpStatus.S_500_INTERNAL_SERVER_ERROR);
return new UpdateResponse(HttpStatus.S_500_INTERNAL_SERVER_ERROR);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 5ee5f9789..b62a869ba 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -79,6 +79,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
@@ -108,6 +109,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
protected final Orchestrator orchestrator;
protected final Boolean warmStandbyEnabled;
protected final Optional<UserQuotaManager> quotaManager;
+ protected final Optional<FlowTriggerHandler> flowTriggerHandler;
@Getter
protected final Map<String, Spec> scheduledFlowSpecs;
@Getter
@@ -163,7 +165,8 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
Config config,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
Optional<TopologyCatalog> topologyCatalog,
Orchestrator orchestrator, SchedulerService schedulerService,
Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
- @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled)
throws Exception {
+ @Named(InjectionNames.WARM_STANDBY_ENABLED) boolean warmStandbyEnabled,
+ Optional<FlowTriggerHandler> flowTriggerHandler) throws Exception {
super(ConfigUtils.configToProperties(config), schedulerService);
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -179,6 +182,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
&& config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
this.warmStandbyEnabled = warmStandbyEnabled;
this.quotaManager = quotaManager;
+ this.flowTriggerHandler = flowTriggerHandler;
// Check that these metrics do not exist before adding, mainly for testing
purpose which creates multiple instances
// of the scheduler. If one metric exists, then the others should as well.
MetricFilter filter =
MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS);
@@ -198,11 +202,13 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
public GobblinServiceJobScheduler(String serviceName, Config config,
FlowStatusGenerator flowStatusGenerator,
- Optional<HelixManager> helixManager,
- Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog>
topologyCatalog, Optional<DagManager> dagManager, Optional<UserQuotaManager>
quotaManager,
- SchedulerService schedulerService, Optional<Logger> log, boolean
warmStandbyEnabled) throws Exception {
+ Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
Optional<TopologyCatalog> topologyCatalog,
+ Optional<DagManager> dagManager, Optional<UserQuotaManager>
quotaManager, SchedulerService schedulerService,
+ Optional<Logger> log, boolean warmStandbyEnabled, Optional
<FlowTriggerHandler> flowTriggerHandler)
+ throws Exception {
this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
- new Orchestrator(config, flowStatusGenerator, topologyCatalog,
dagManager, log), schedulerService, quotaManager, log, warmStandbyEnabled);
+ new Orchestrator(config, flowStatusGenerator, topologyCatalog,
dagManager, log, flowTriggerHandler),
+ schedulerService, quotaManager, log, warmStandbyEnabled,
flowTriggerHandler);
}
public synchronized void setActive(boolean isActive) {
@@ -440,7 +446,9 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
public void runJob(Properties jobProps, JobListener jobListener) throws
JobException {
try {
Spec flowSpec =
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
- this.orchestrator.orchestrate(flowSpec);
+ String triggerTimestampMillis = jobProps.getProperty(
+ ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
"0");
+ this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis));
} catch (Exception e) {
throw new JobException("Failed to run Spec: " +
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 732697038..456851612 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -18,7 +18,8 @@
package org.apache.gobblin.service.monitoring;
import java.io.IOException;
-import java.sql.SQLException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -34,10 +35,14 @@ import
org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
/**
@@ -52,6 +57,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
// Metrics
private ContextAwareMeter killsInvoked;
private ContextAwareMeter resumesInvoked;
+ private ContextAwareMeter flowsLaunched;
private ContextAwareMeter unexpectedErrors;
private ContextAwareMeter messageProcessedMeter;
private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from
all partitions in one gauge
@@ -71,17 +77,23 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
protected DagActionStore dagActionStore;
protected DagManager dagManager;
+ protected Orchestrator orchestrator;
+ protected boolean isMultiActiveSchedulerEnabled;
+ protected FlowCatalog flowCatalog;
// Note that the topic is an empty string (rather than null to avoid NPE)
because this monitor relies on the consumer
// client itself to determine all Kafka related information dynamically
rather than through the config.
public DagActionStoreChangeMonitor(String topic, Config config,
DagActionStore dagActionStore, DagManager dagManager,
- int numThreads) {
+ int numThreads, FlowCatalog flowCatalog, Orchestrator orchestrator,
boolean isMultiActiveSchedulerEnabled) {
// Differentiate group id for each host
super(topic, config.withValue(GROUP_ID_KEY,
ConfigValueFactory.fromAnyRef(DAG_ACTION_CHANGE_MONITOR_PREFIX +
UUID.randomUUID().toString())),
numThreads);
this.dagActionStore = dagActionStore;
this.dagManager = dagManager;
+ this.flowCatalog = flowCatalog;
+ this.orchestrator = orchestrator;
+ this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
}
@Override
@@ -93,7 +105,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
@Override
/*
- This class is multi-threaded and this message will be called by multiple
threads, however any given message will be
+ This class is multithreaded and this method will be called by multiple
threads, however any given message will be
partitioned and processed by only one thread (and corresponding queue).
*/
protected void processMessage(DecodeableKafkaRecord message) {
@@ -109,6 +121,8 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
String flowName = value.getFlowName();
String flowExecutionId = value.getFlowExecutionId();
+ DagActionStore.FlowActionType dagActionType =
DagActionStore.FlowActionType.valueOf(value.getDagAction().toString());
+
produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
log.debug("Processing Dag Action message for flow group: {} name: {}
executionId: {} tid: {} operation: {} lag: {}",
flowGroup, flowName, flowExecutionId, tid, operation,
produceToConsumeDelayValue);
@@ -119,47 +133,37 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
return;
}
- // Retrieve the Dag Action taken from MySQL table unless operation is
DELETE
- DagActionStore.DagActionValue dagAction = null;
- if (!operation.equals("DELETE")) {
- try {
- dagAction = dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId).getDagActionValue();
- } catch (IOException e) {
- log.error("Encountered IOException trying to retrieve dagAction for
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup,
flowName, flowExecutionId, e);
- this.unexpectedErrors.mark();
- return;
- } catch (SpecNotFoundException e) {
- log.error("DagAction not found for flow group: {} name: {}
executionId: {} Exception: {}", flowGroup, flowName,
- flowExecutionId, e);
- this.unexpectedErrors.mark();
- return;
- } catch (SQLException throwables) {
- log.error("Encountered SQLException trying to retrieve dagAction for
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup,
flowName, flowExecutionId, throwables);
- return;
- }
- }
-
- // We only expert INSERT and DELETE operations done to this table. INSERTs
correspond to resume or delete flow
- // requests that have to be processed. DELETEs require no action.
+ // We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
+ // {@link DagActionStore.FlowActionType} flow requests that have to be
processed. DELETEs require no action.
try {
if (operation.equals("INSERT")) {
- if (dagAction.equals(DagActionStore.DagActionValue.RESUME)) {
+ if (dagActionType.equals(DagActionStore.FlowActionType.RESUME)) {
log.info("Received insert dag action and about to send resume flow
request");
dagManager.handleResumeFlowRequest(flowGroup,
flowName,Long.parseLong(flowExecutionId));
this.resumesInvoked.mark();
- } else if (dagAction.equals(DagActionStore.DagActionValue.KILL)) {
+ } else if (dagActionType.equals(DagActionStore.FlowActionType.KILL)) {
log.info("Received insert dag action and about to send kill flow
request");
dagManager.handleKillFlowRequest(flowGroup, flowName,
Long.parseLong(flowExecutionId));
this.killsInvoked.mark();
+ } else if (dagActionType.equals(DagActionStore.FlowActionType.LAUNCH))
{
+ // If multi-active scheduler is NOT turned on we should not receive
these type of events
+ if (!this.isMultiActiveSchedulerEnabled) {
+ this.unexpectedErrors.mark();
+ throw new RuntimeException(String.format("Received LAUNCH
dagAction while not in multi-active scheduler "
+ + "mode for flowAction: %s",
+ new DagActionStore.DagAction(flowGroup, flowName,
flowExecutionId, dagActionType)));
+ }
+ log.info("Received insert dag action and about to forward launch
request to DagManager");
+ submitFlowToDagManagerHelper(flowGroup, flowName);
} else {
- log.warn("Received unsupported dagAction {}. Expected to be a KILL
or RESUME", dagAction);
+ log.warn("Received unsupported dagAction {}. Expected to be a KILL,
RESUME, or LAUNCH", dagActionType);
this.unexpectedErrors.mark();
return;
}
} else if (operation.equals("UPDATE")) {
log.warn("Received an UPDATE action to the DagActionStore when values
in this store are never supposed to be "
+ "updated. Flow group: {} name {} executionId {} were updated to
action {}", flowGroup, flowName,
- flowExecutionId, dagAction);
+ flowExecutionId, dagActionType);
this.unexpectedErrors.mark();
} else if (operation.equals("DELETE")) {
log.debug("Deleted flow group: {} name: {} executionId {} from
DagActionStore", flowGroup, flowName, flowExecutionId);
@@ -177,15 +181,40 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
dagActionsSeenCache.put(changeIdentifier, changeIdentifier);
}
+ protected void submitFlowToDagManagerHelper(String flowGroup, String
flowName) {
+ // Retrieve job execution plan by recompiling the flow spec to send to the
DagManager
+ FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
+ FlowSpec spec = null;
+ try {
+ URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
+ spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+ this.orchestrator.submitFlowToDagManager(spec);
+ } catch (URISyntaxException e) {
+ log.warn("Could not create URI object for flowId {} due to error {}",
flowId, e.getMessage());
+ this.unexpectedErrors.mark();
+ return;
+ } catch (SpecNotFoundException e) {
+ log.warn("Spec not found for flow group: {} name: {} Exception: {}",
flowGroup, flowName, e);
+ this.unexpectedErrors.mark();
+ return;
+ } catch (IOException e) {
+ log.warn("Failed to add Job Execution Plan for flow group: {} name: {}
due to error {}", flowGroup, flowName, e);
+ this.unexpectedErrors.mark();
+ return;
+ }
+ // Only mark this if the dag was successfully added
+ this.flowsLaunched.mark();
+ }
+
@Override
protected void createMetrics() {
super.createMetrics();
this.killsInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
this.resumesInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
+ this.flowsLaunched =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);
this.unexpectedErrors =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
this.produceToConsumeDelayMillis =
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
() -> produceToConsumeDelayValue);
this.getMetricContext().register(this.produceToConsumeDelayMillis);
}
-
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
index d4a0656b3..5806949a8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitorFactory.java
@@ -22,11 +22,15 @@ import java.util.Objects;
import com.typesafe.config.Config;
import javax.inject.Inject;
+import javax.inject.Named;
import javax.inject.Provider;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.util.ConfigUtils;
@@ -40,12 +44,20 @@ public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionSto
private final Config config;
private DagActionStore dagActionStore;
private DagManager dagManager;
+ private FlowCatalog flowCatalog;
+ private Orchestrator orchestrator;
+ private boolean isMultiActiveSchedulerEnabled;
@Inject
- public DagActionStoreChangeMonitorFactory(Config config, DagActionStore
dagActionStore, DagManager dagManager) {
+ public DagActionStoreChangeMonitorFactory(Config config, DagActionStore
dagActionStore, DagManager dagManager,
+ FlowCatalog flowCatalog, Orchestrator orchestrator,
+ @Named(InjectionNames.MULTI_ACTIVE_SCHEDULER_ENABLED) boolean
isMultiActiveSchedulerEnabled) {
this.config = Objects.requireNonNull(config);
this.dagActionStore = dagActionStore;
this.dagManager = dagManager;
+ this.flowCatalog = flowCatalog;
+ this.orchestrator = orchestrator;
+ this.isMultiActiveSchedulerEnabled = isMultiActiveSchedulerEnabled;
}
private DagActionStoreChangeMonitor createDagActionStoreMonitor()
@@ -56,7 +68,8 @@ public class DagActionStoreChangeMonitorFactory implements
Provider<DagActionSto
String topic = ""; // Pass empty string because we expect underlying
client to dynamically determine the Kafka topic
int numThreads = ConfigUtils.getInt(dagActionStoreChangeConfig,
DAG_ACTION_STORE_CHANGE_MONITOR_NUM_THREADS_KEY, 5);
- return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig,
this.dagActionStore, this.dagManager, numThreads);
+ return new DagActionStoreChangeMonitor(topic, dagActionStoreChangeConfig,
this.dagActionStore, this.dagManager,
+ numThreads, flowCatalog, orchestrator, isMultiActiveSchedulerEnabled);
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
index f63fa6624..4d2771368 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitorFactory.java
@@ -42,7 +42,7 @@ public class SpecStoreChangeMonitorFactory implements
Provider<SpecStoreChangeMo
private GobblinServiceJobScheduler scheduler;
@Inject
- public SpecStoreChangeMonitorFactory(Config config,FlowCatalog flowCatalog,
GobblinServiceJobScheduler scheduler) {
+ public SpecStoreChangeMonitorFactory(Config config, FlowCatalog flowCatalog,
GobblinServiceJobScheduler scheduler) {
this.config = Objects.requireNonNull(config);
this.flowCatalog = flowCatalog;
this.scheduler = scheduler;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index facdca98c..e8c4fd443 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -84,13 +84,13 @@ public class DagManagerFlowTest {
.build();
dagActionStore = new MysqlDagActionStore(config);
- dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionValue.KILL);
- dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2,
DagActionStore.DagActionValue.RESUME);
+ dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.FlowActionType.KILL);
+ dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId_2,
DagActionStore.FlowActionType.RESUME);
dagManager = new MockedDagManager(ConfigUtils.propertiesToConfig(props),
false);
dagManager.dagActionStore = Optional.of(dagActionStore);
dagManager.setActive(true);
this.dagNumThreads = dagManager.getNumThreads();
- Thread.sleep(10000);
+ Thread.sleep(30000);
// On active, should proceed request and delete action entry
Assert.assertEquals(dagActionStore.getDagActions().size(), 0);
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index b1cec2b3a..fafcc9605 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -79,6 +79,7 @@ public class OrchestratorTest {
private SpecCatalogListener mockListener;
private FlowSpec flowSpec;
private FlowStatusGenerator mockStatusGenerator;
+ private FlowTriggerHandler _mockFlowTriggerHandler;
private Orchestrator orchestrator;
@BeforeClass
@@ -107,9 +108,10 @@ public class OrchestratorTest {
this.serviceLauncher.addService(flowCatalog);
this.mockStatusGenerator = mock(FlowStatusGenerator.class);
+ this._mockFlowTriggerHandler = mock(FlowTriggerHandler.class);
this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
- this.mockStatusGenerator,
- Optional.of(this.topologyCatalog), Optional.<DagManager>absent(),
Optional.of(logger));
+ this.mockStatusGenerator, Optional.of(this.topologyCatalog),
Optional.<DagManager>absent(), Optional.of(logger),
+ Optional.of(this._mockFlowTriggerHandler));
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
// Start application
@@ -341,13 +343,13 @@ public class OrchestratorTest {
flowProps.put("gobblin.flow.destinationIdentifier", "destination");
flowProps.put("flow.allowConcurrentExecution", false);
FlowSpec adhocSpec = new FlowSpec(URI.create("flow0/group0"), "1", "",
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(),
Optional.absent());
- this.orchestrator.orchestrate(adhocSpec);
+ this.orchestrator.orchestrate(adhocSpec, flowProps, 0);
String metricName =
MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, "group0",
"flow0", ServiceMetricNames.COMPILED);
Assert.assertNull(metricContext.getParent().get().getGauges().get(metricName));
flowProps.setProperty("job.schedule", "0/2 * * * * ?");
FlowSpec scheduledSpec = new FlowSpec(URI.create("flow0/group0"), "1", "",
ConfigUtils.propertiesToConfig(flowProps) , flowProps, Optional.absent(),
Optional.absent());
- this.orchestrator.orchestrate(scheduledSpec);
+ this.orchestrator.orchestrate(scheduledSpec, flowProps, 0);
Assert.assertNotNull(metricContext.getParent().get().getGauges().get(metricName));
}
}
\ No newline at end of file
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index 22f060bbd..6db4a83e3 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -60,6 +60,7 @@ import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.AbstractUserQuotaManager;
import
org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
@@ -348,7 +349,8 @@ public class GobblinServiceJobSchedulerTest {
SchedulerService schedulerService = new SchedulerService(new Properties());
// Mock a GaaS scheduler not in warm standby mode
GobblinServiceJobScheduler scheduler = new
GobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService, Optional.of(new
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false);
+ ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService, Optional.of(new
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), false,
Optional.of(Mockito.mock(
+ FlowTriggerHandler.class)));
schedulerService.startAsync().awaitRunning();
scheduler.startUp();
@@ -366,7 +368,8 @@ public class GobblinServiceJobSchedulerTest {
//Mock a GaaS scheduler in warm standby mode, where we don't check quota
GobblinServiceJobScheduler schedulerWithWarmStandbyEnabled = new
GobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService, Optional.of(new
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true);
+ ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog),
null, mockOrchestrator, schedulerService, Optional.of(new
InMemoryUserQuotaManager(quotaConfig)), Optional.absent(), true,
Optional.of(Mockito.mock(
+ FlowTriggerHandler.class)));
schedulerWithWarmStandbyEnabled.startUp();
schedulerWithWarmStandbyEnabled.setActive(true);
@@ -388,7 +391,8 @@ public class GobblinServiceJobSchedulerTest {
public TestGobblinServiceJobScheduler(String serviceName, Config config,
Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog>
topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager>
quotaManager,
SchedulerService schedulerService, boolean isWarmStandbyEnabled)
throws Exception {
- super(serviceName, config, Optional.absent(), flowCatalog,
topologyCatalog, orchestrator, schedulerService, quotaManager,
Optional.absent(), isWarmStandbyEnabled);
+ super(serviceName, config, Optional.absent(), flowCatalog,
topologyCatalog, orchestrator, schedulerService, quotaManager,
Optional.absent(), isWarmStandbyEnabled, Optional.of(Mockito.mock(
+ FlowTriggerHandler.class)));
if (schedulerService != null) {
hasScheduler = true;
}