[
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=864378&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-864378
]
ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 08/Jun/23 08:49
Start Date: 08/Jun/23 08:49
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1222573977
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+
+ protected final DataSource dataSource;
+ private final String leaseArbiterTableName;
+ private final String constantsTableName;
+ private final int epsilon;
+ private final int linger;
+ protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=?";
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=? AND event_timestamp=? AND
lease_acquisition_timestamp=?";
+
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
event_timestamp in table is within
+ // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired,
3 if column is NULL or no longer leasing)
+ protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
+ + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then
1"
+ + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then
2"
+ + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
Review Comment:
what about:
```
(lease_acquisition_timestamp + linger) < CURRENT_TIMESTAMP as isLeaseExpired
```
then you either have:
1 (TRUE) - expired
0 (FALSE) - not expired
NULL - no lease
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
Review Comment:
nice clear and informative javadoc!
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+
+ protected final DataSource dataSource;
+ private final String leaseArbiterTableName;
+ private final String constantsTableName;
+ private final int epsilon;
+ private final int linger;
+ protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=?";
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
Review Comment:
`WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND
lease_acquisition_timestamp=?"`?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+
+ protected final DataSource dataSource;
+ private final String leaseArbiterTableName;
+ private final String constantsTableName;
+ private final int epsilon;
+ private final int linger;
+ protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=?";
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=? AND event_timestamp=? AND
lease_acquisition_timestamp=?";
+
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
event_timestamp in table is within
+ // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired,
3 if column is NULL or no longer leasing)
+ protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
+ + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then
1"
+ + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then
2"
+ + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Insert or update row to acquire lease if values have not changed since
the previous read
+ // Need to define three separate statements to handle cases where row does
not exist or has null values to check
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
Review Comment:
`CONDITIONALLY_INITIALIZE_LEASE_STATEMENT` or
`CONDITIONALLY_CREATE_LEASE_STATEMENT`?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+
+ protected final DataSource dataSource;
+ private final String leaseArbiterTableName;
+ private final String constantsTableName;
+ private final int epsilon;
+ private final int linger;
+ protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=?";
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=? AND event_timestamp=? AND
lease_acquisition_timestamp=?";
+
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
event_timestamp in table is within
+ // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired,
3 if column is NULL or no longer leasing)
+ protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
+ + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then
1"
+ + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then
2"
+ + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Insert or update row to acquire lease if values have not changed since
the previous read
+ // Need to define three separate statements to handle cases where row does
not exist or has null values to check
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
+ + "(flow_group, flow_name, flow_execution_id, flow_action,
event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT "
+ + "EXISTS (SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_KEY + "); " +
SELECT_AFTER_INSERT_STATEMENT;
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+ + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_KEY
+ + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL; " +
SELECT_AFTER_INSERT_STATEMENT;
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
+ + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_ROW
+ + " AND event_timestamp=? AND lease_acquisition_timestamp=?; " +
SELECT_AFTER_INSERT_STATEMENT;
+
+ // Complete lease acquisition if values have not changed since lease was
acquired
+ protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT =
"UPDATE %s SET "
+ + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ // TODO: define retention on this table
+ private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %S ("
Review Comment:
nit: place this definition first, so the schema is clear before the other
statements come along
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+
+ protected final DataSource dataSource;
+ private final String leaseArbiterTableName;
+ private final String constantsTableName;
+ private final int epsilon;
+ private final int linger;
+ protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=?";
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=? AND event_timestamp=? AND
lease_acquisition_timestamp=?";
+
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
event_timestamp in table is within
+ // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired,
3 if column is NULL or no longer leasing)
+ protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
+ + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then
1"
+ + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then
2"
+ + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Insert or update row to acquire lease if values have not changed since
the previous read
+ // Need to define three separate statements to handle cases where row does
not exist or has null values to check
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
+ + "(flow_group, flow_name, flow_execution_id, flow_action,
event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT "
+ + "EXISTS (SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_KEY + "); " +
SELECT_AFTER_INSERT_STATEMENT;
Review Comment:
is this `WHERE NOT EXISTS` necessary? isn't it equivalent to the PK
uniqueness constraint already to be enforced?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
+import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
+ * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
+ * previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
+ * cases.
+ */
+public class SchedulerLeaseAlgoHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
+ private final int staggerUpperBoundSec;
+ private static Random random = new Random();
+ protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+ protected JobScheduler jobScheduler;
+ protected SchedulerService schedulerService;
+ protected DagActionStore dagActionStore;
+ private MetricContext metricContext;
+ private ContextAwareMeter numLeasesCompleted;
+ @Inject
+ public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ JobScheduler jobScheduler, SchedulerService schedulerService,
DagActionStore dagActionStore) {
+ this.staggerUpperBoundSec = ConfigUtils.getInt(config,
+ ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+ this.multiActiveLeaseArbiter = leaseDeterminationStore;
Review Comment:
param name doesn't match
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
+import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
+ * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
+ * previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
+ * cases.
+ */
+public class SchedulerLeaseAlgoHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
+ private final int staggerUpperBoundSec;
+ private static Random random = new Random();
+ protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+ protected JobScheduler jobScheduler;
+ protected SchedulerService schedulerService;
+ protected DagActionStore dagActionStore;
+ private MetricContext metricContext;
+ private ContextAwareMeter numLeasesCompleted;
+ @Inject
+ public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ JobScheduler jobScheduler, SchedulerService schedulerService,
DagActionStore dagActionStore) {
+ this.staggerUpperBoundSec = ConfigUtils.getInt(config,
+ ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+ this.multiActiveLeaseArbiter = leaseDeterminationStore;
+ this.jobScheduler = jobScheduler;
+ this.schedulerService = schedulerService;
+ this.dagActionStore = dagActionStore;
+ this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+ this.getClass());
+ this.numLeasesCompleted =
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED);
+ }
+
+ /**
+ * This method is used in the multi-active scheduler case for one or more
hosts to respond to a flow action event
+ * by attempting a lease for the flow event and processing the result
depending on the status of the attempt.
+ * @param jobProps
+ * @param flowAction
+ * @param eventTimeMillis
+ * @throws IOException
+ */
+ public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ throws IOException {
+ LeaseAttemptStatus leaseAttemptStatus =
+ multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
+ // TODO: add a log event or metric for each of these cases
+ switch (leaseAttemptStatus.getClass().getSimpleName()) {
+ case "LeaseObtainedStatus":
Review Comment:
`instanceof` would be more canonical
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
+import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
+ * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
+ * previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
+ * cases.
+ */
+public class SchedulerLeaseAlgoHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
+ private final int staggerUpperBoundSec;
+ private static Random random = new Random();
+ protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+ protected JobScheduler jobScheduler;
+ protected SchedulerService schedulerService;
+ protected DagActionStore dagActionStore;
+ private MetricContext metricContext;
+ private ContextAwareMeter numLeasesCompleted;
+ @Inject
+ public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ JobScheduler jobScheduler, SchedulerService schedulerService,
DagActionStore dagActionStore) {
+ this.staggerUpperBoundSec = ConfigUtils.getInt(config,
+ ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+ this.multiActiveLeaseArbiter = leaseDeterminationStore;
+ this.jobScheduler = jobScheduler;
+ this.schedulerService = schedulerService;
+ this.dagActionStore = dagActionStore;
+ this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+ this.getClass());
+ this.numLeasesCompleted =
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED);
+ }
+
+ /**
+ * This method is used in the multi-active scheduler case for one or more
hosts to respond to a flow action event
+ * by attempting a lease for the flow event and processing the result
depending on the status of the attempt.
+ * @param jobProps
+ * @param flowAction
+ * @param eventTimeMillis
+ * @throws IOException
+ */
+ public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ throws IOException {
+ LeaseAttemptStatus leaseAttemptStatus =
+ multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
+ // TODO: add a log event or metric for each of these cases
+ switch (leaseAttemptStatus.getClass().getSimpleName()) {
+ case "LeaseObtainedStatus":
+ finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
+ break;
+ case "LeasedToAnotherStatus":
+ scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
+ break;
+ case "NoLongerLeasingStatus":
+ break;
+ default:
Review Comment:
let's code defensively by throwing an exception even in cases we believe
"can't happen"
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
+import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
+ * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
+ * previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
+ * cases.
+ */
+public class SchedulerLeaseAlgoHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
+ private final int staggerUpperBoundSec;
+ private static Random random = new Random();
+ protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+ protected JobScheduler jobScheduler;
+ protected SchedulerService schedulerService;
+ protected DagActionStore dagActionStore;
+ private MetricContext metricContext;
+ private ContextAwareMeter numLeasesCompleted;
+ @Inject
+ public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ JobScheduler jobScheduler, SchedulerService schedulerService,
DagActionStore dagActionStore) {
+ this.staggerUpperBoundSec = ConfigUtils.getInt(config,
+ ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+ this.multiActiveLeaseArbiter = leaseDeterminationStore;
+ this.jobScheduler = jobScheduler;
+ this.schedulerService = schedulerService;
+ this.dagActionStore = dagActionStore;
+ this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+ this.getClass());
+ this.numLeasesCompleted =
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED);
+ }
+
+ /**
+ * This method is used in the multi-active scheduler case for one or more
hosts to respond to a flow action event
+ * by attempting a lease for the flow event and processing the result
depending on the status of the attempt.
+ * @param jobProps
+ * @param flowAction
+ * @param eventTimeMillis
+ * @throws IOException
+ */
+ public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
Review Comment:
AFAICT, this is the only "real" method in the class's external interface.
revisiting the class and method naming in light of that might suggest
`FlowTriggerHandler`, since it's not really "handling" a "scheduler lease".
WDYT?
if so this method might be `handleTriggerEvent` or `handle`, or even just
`apply`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+
+ protected final DataSource dataSource;
+ private final String leaseArbiterTableName;
+ private final String constantsTableName;
+ private final int epsilon;
+ private final int linger;
+ protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=?";
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=? AND event_timestamp=? AND
lease_acquisition_timestamp=?";
+
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
event_timestamp in table is within
+ // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired,
3 if column is NULL or no longer leasing)
+ protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
+ + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then
1"
+ + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then
2"
+ + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Insert or update row to acquire lease if values have not changed since
the previous read
+ // Need to define three separate statements to handle cases where row does
not exist or has null values to check
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
+ + "(flow_group, flow_name, flow_execution_id, flow_action,
event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT "
+ + "EXISTS (SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_KEY + "); " +
SELECT_AFTER_INSERT_STATEMENT;
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+ + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_KEY
+ + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL; " +
SELECT_AFTER_INSERT_STATEMENT;
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
+ + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_ROW
+ + " AND event_timestamp=? AND lease_acquisition_timestamp=?; " +
SELECT_AFTER_INSERT_STATEMENT;
+
+ // Complete lease acquisition if values have not changed since lease was
acquired
+ protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT =
"UPDATE %s SET "
+ + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ // TODO: define retention on this table
+ private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %S ("
+ + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")
NOT NULL, flow_name varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
+ + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
+
+ private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE
IF NOT EXISTS %s "
+ + "(epsilon INT, linger INT), PRIMARY KEY (epsilon, linger); INSERT INTO
%s (epsilon, linger) VALUES (?,?)";
+
+ @Inject
+ public MySQLMultiActiveLeaseArbiter(Config config) throws IOException {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ } else {
+ throw new IOException("Please specify the config for
MySQLMultiActiveLeaseArbiter");
+ }
+
+ this.leaseArbiterTableName = ConfigUtils.getString(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
+ this.constantsTableName = ConfigUtils.getString(config,
ConfigurationKeys.MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY,
+ ConfigurationKeys.DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE);
+ this.epsilon = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
+ this.linger = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
+ this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement createStatement =
connection.prepareStatement(String.format(
+ CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+ createStatement.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
+ }
+ withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT,
this.constantsTableName, this.constantsTableName),
+ createStatement -> {
+ int i = 0;
+ createStatement.setInt(++i, epsilon);
+ createStatement.setInt(++i, linger);
+ return createStatement.executeUpdate();}, true);
+ }
+
+ @Override
+ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis)
+ throws IOException {
+ String flowGroup = flowAction.getFlowGroup();
+ String flowName = flowAction.getFlowName();
+ String flowExecutionId = flowAction.getFlowExecutionId();
+ Timestamp eventTimestamp = new Timestamp(eventTimeMillis);
Review Comment:
up to you, but not sure these add anything beyond, say:
```
getInfoStatement.setTimestamp(++i, flowAction.getFlowGroup());
```
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+
+ protected final DataSource dataSource;
+ private final String leaseArbiterTableName;
+ private final String constantsTableName;
+ private final int epsilon;
+ private final int linger;
+ protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=?";
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=? AND event_timestamp=? AND
lease_acquisition_timestamp=?";
+
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
Review Comment:
do we also need to get the `event_timestamp` in case another value,
different from ours was successfully inserted?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+
+ protected final DataSource dataSource;
+ private final String leaseArbiterTableName;
+ private final String constantsTableName;
+ private final int epsilon;
+ private final int linger;
+ protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=?";
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=? AND event_timestamp=? AND
lease_acquisition_timestamp=?";
+
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
event_timestamp in table is within
+ // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired,
3 if column is NULL or no longer leasing)
+ protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
+ + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then
1"
+ + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then
2"
+ + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Insert or update row to acquire lease if values have not changed since
the previous read
+ // Need to define three separate statements to handle cases where row does
not exist or has null values to check
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
+ + "(flow_group, flow_name, flow_execution_id, flow_action,
event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT "
+ + "EXISTS (SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_KEY + "); " +
SELECT_AFTER_INSERT_STATEMENT;
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+ + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_KEY
+ + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL; " +
SELECT_AFTER_INSERT_STATEMENT;
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
+ + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_ROW
+ + " AND event_timestamp=? AND lease_acquisition_timestamp=?; " +
SELECT_AFTER_INSERT_STATEMENT;
+
+ // Complete lease acquisition if values have not changed since lease was
acquired
+ protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT =
"UPDATE %s SET "
+ + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ // TODO: define retention on this table
+ private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %S ("
+ + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")
NOT NULL, flow_name varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
+ + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
+
+ private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE
IF NOT EXISTS %s "
+ + "(epsilon INT, linger INT), PRIMARY KEY (epsilon, linger); INSERT INTO
%s (epsilon, linger) VALUES (?,?)";
+
+ @Inject
+ public MySQLMultiActiveLeaseArbiter(Config config) throws IOException {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ } else {
+ throw new IOException("Please specify the config for
MySQLMultiActiveLeaseArbiter");
+ }
+
+ this.leaseArbiterTableName = ConfigUtils.getString(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
+ this.constantsTableName = ConfigUtils.getString(config,
ConfigurationKeys.MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY,
+ ConfigurationKeys.DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE);
+ this.epsilon = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
+ this.linger = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
+ this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement createStatement =
connection.prepareStatement(String.format(
+ CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+ createStatement.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
+ }
+ withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT,
this.constantsTableName, this.constantsTableName),
+ createStatement -> {
+ int i = 0;
+ createStatement.setInt(++i, epsilon);
+ createStatement.setInt(++i, linger);
+ return createStatement.executeUpdate();}, true);
+ }
+
+ @Override
+ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis)
+ throws IOException {
+ String flowGroup = flowAction.getFlowGroup();
+ String flowName = flowAction.getFlowName();
+ String flowExecutionId = flowAction.getFlowExecutionId();
+ Timestamp eventTimestamp = new Timestamp(eventTimeMillis);
+
+ // Check table for an existing entry for this flow action and event time
+ ResultSet resultSet = withPreparedStatement(
+ String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName),
+ getInfoStatement -> {
+ int i = 0;
+ getInfoStatement.setTimestamp(i, eventTimestamp);
+ getInfoStatement.setString(i, flowGroup);
+ getInfoStatement.setString(i, flowName);
+ getInfoStatement.setString(i, flowExecutionId);
+ getInfoStatement.setString(i,
flowAction.getFlowActionType().toString());
Review Comment:
`++i` (!)
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+
+ protected final DataSource dataSource;
+ private final String leaseArbiterTableName;
+ private final String constantsTableName;
+ private final int epsilon;
+ private final int linger;
+ protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=?";
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=? AND event_timestamp=? AND
lease_acquisition_timestamp=?";
+
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
event_timestamp in table is within
+ // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired,
3 if column is NULL or no longer leasing)
+ protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
+ + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then
1"
+ + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then
2"
+ + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Insert or update row to acquire lease if values have not changed since
the previous read
+ // Need to define three separate statements to handle cases where row does
not exist or has null values to check
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
+ + "(flow_group, flow_name, flow_execution_id, flow_action,
event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT "
+ + "EXISTS (SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_KEY + "); " +
SELECT_AFTER_INSERT_STATEMENT;
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+ + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_KEY
+ + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL; " +
SELECT_AFTER_INSERT_STATEMENT;
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
+ + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_ROW
+ + " AND event_timestamp=? AND lease_acquisition_timestamp=?; " +
SELECT_AFTER_INSERT_STATEMENT;
+
+ // Complete lease acquisition if values have not changed since lease was
acquired
+ protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT =
"UPDATE %s SET "
+ + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ // TODO: define retention on this table
+ private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %S ("
+ + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")
NOT NULL, flow_name varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
+ + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
+
+ private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE
IF NOT EXISTS %s "
+ + "(epsilon INT, linger INT), PRIMARY KEY (epsilon, linger); INSERT INTO
%s (epsilon, linger) VALUES (?,?)";
+
+ @Inject
+ public MySQLMultiActiveLeaseArbiter(Config config) throws IOException {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ } else {
+ throw new IOException("Please specify the config for
MySQLMultiActiveLeaseArbiter");
Review Comment:
minor: but I suggest printing `CONFIG_PREFIX` w/ the error, so there's no
confusion there
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease,
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on
a flow event. It should be much greater
+ * than epsilon and encapsulate executor communication latency
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH,
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+ /** `j.u.Function` variant for an operation that may @throw IOException or
SQLException: preserves method signature checked exceptions */
+ @FunctionalInterface
+ protected interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException, SQLException;
+ }
+
+ public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+
+ protected final DataSource dataSource;
+ private final String leaseArbiterTableName;
+ private final String constantsTableName;
+ private final int epsilon;
+ private final int linger;
+ protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=?";
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=?"
+ + " AND flow_action=? AND event_timestamp=? AND
lease_acquisition_timestamp=?";
+
+ protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "lease_acquisition_timestamp, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Does a cross join between the two tables to have epsilon and linger
values available. Returns the following values:
+ // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if
event_timestamp in table is within
+ // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired,
3 if column is NULL or no longer leasing)
+ protected static final String GET_EVENT_INFO_STATEMENT = "SELECT
event_timestamp, lease_acquisition_timestamp, "
+ + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+ + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then
1"
+ + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then
2"
+ + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " +
WHERE_CLAUSE_TO_MATCH_KEY;
+
+ // Insert or update row to acquire lease if values have not changed since
the previous read
+ // Need to define three separate statements to handle cases where row does
not exist or has null values to check
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s "
+ + "(flow_group, flow_name, flow_execution_id, flow_action,
event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT "
+ + "EXISTS (SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_KEY + "); " +
SELECT_AFTER_INSERT_STATEMENT;
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+ + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_KEY
+ + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL; " +
SELECT_AFTER_INSERT_STATEMENT;
+ protected static final String
CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT = "UPDATE %s "
+ + "SET event_timestamp=?" + WHERE_CLAUSE_TO_MATCH_ROW
+ + " AND event_timestamp=? AND lease_acquisition_timestamp=?; " +
SELECT_AFTER_INSERT_STATEMENT;
+
+ // Complete lease acquisition if values have not changed since lease was
acquired
+ protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT =
"UPDATE %s SET "
+ + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ // TODO: define retention on this table
+ private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE
TABLE IF NOT EXISTS %S ("
+ + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ")
NOT NULL, flow_name varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
+ + "lease_acquisition_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action))";
+
+ private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE
IF NOT EXISTS %s "
+ + "(epsilon INT, linger INT), PRIMARY KEY (epsilon, linger); INSERT INTO
%s (epsilon, linger) VALUES (?,?)";
+
+ @Inject
+ public MySQLMultiActiveLeaseArbiter(Config config) throws IOException {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ } else {
+ throw new IOException("Please specify the config for
MySQLMultiActiveLeaseArbiter");
+ }
+
+ this.leaseArbiterTableName = ConfigUtils.getString(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
+ this.constantsTableName = ConfigUtils.getString(config,
ConfigurationKeys.MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY,
+ ConfigurationKeys.DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE);
+ this.epsilon = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
+ this.linger = ConfigUtils.getInt(config,
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
+ this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement createStatement =
connection.prepareStatement(String.format(
+ CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+ createStatement.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Table creation failure for " +
leaseArbiterTableName, e);
+ }
+ withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT,
this.constantsTableName, this.constantsTableName),
+ createStatement -> {
+ int i = 0;
+ createStatement.setInt(++i, epsilon);
+ createStatement.setInt(++i, linger);
+ return createStatement.executeUpdate();}, true);
+ }
+
+ @Override
+ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis)
+ throws IOException {
+ String flowGroup = flowAction.getFlowGroup();
+ String flowName = flowAction.getFlowName();
+ String flowExecutionId = flowAction.getFlowExecutionId();
+ Timestamp eventTimestamp = new Timestamp(eventTimeMillis);
+
+ // Check table for an existing entry for this flow action and event time
+ ResultSet resultSet = withPreparedStatement(
+ String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName),
+ getInfoStatement -> {
+ int i = 0;
+ getInfoStatement.setTimestamp(i, eventTimestamp);
+ getInfoStatement.setString(i, flowGroup);
+ getInfoStatement.setString(i, flowName);
+ getInfoStatement.setString(i, flowExecutionId);
+ getInfoStatement.setString(i,
flowAction.getFlowActionType().toString());
+ return getInfoStatement.executeQuery();
+ }, true);
+
+ try {
+ // CASE 1: If no existing row for this flow action, then go ahead and
insert
+ if (!resultSet.next()) {
+ ResultSet rs = withPreparedStatement(
+ String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT,
this.leaseArbiterTableName,
+ this.leaseArbiterTableName, this.leaseArbiterTableName,
this.constantsTableName),
+ insertStatement -> {
+ completeInsertPreparedStatement(insertStatement, flowAction,
eventTimeMillis);
+ return insertStatement.executeQuery();
+ }, true);
+ return handleResultFromAttemptedLeaseObtainment(rs, eventTimeMillis);
+ }
+
+ // Extract values from result set
+ Timestamp dbEventTimestamp = resultSet.getTimestamp(1);
+ Timestamp dbLeaseAcquisitionTimestamp = resultSet.getTimestamp(2);
+ boolean isWithinEpsilon = resultSet.getBoolean(3);
+ int leaseValidityStatus = resultSet.getInt(4);
+ int dbLinger = resultSet.getInt(5);
+
+ // Lease is valid
+ if (leaseValidityStatus == 1) {
+ // CASE 2: Same event, lease is valid
+ if (isWithinEpsilon) {
+ // Utilize db timestamp for reminder
+ return new LeasedToAnotherStatus(dbEventTimestamp.getTime(),
+ dbLeaseAcquisitionTimestamp.getTime() + dbLinger);
+ }
+ // CASE 3: Distinct event, lease is valid
+ // Utilize db timestamp for wait time, but be reminded of own event
timestamp
+ return new LeasedToAnotherStatus(eventTimeMillis,
+ dbLeaseAcquisitionTimestamp.getTime() + dbLinger);
+ }
+ // CASE 4: Lease is out of date (regardless of whether same or distinct
event)
+ else if (leaseValidityStatus == 2) {
+ if (isWithinEpsilon) {
+ LOG.warn("Lease should not be out of date for the same trigger event
since epsilon << linger for flowAction"
+ + " {}, db eventTimestamp {}, db leaseAcquisitionTimestamp
{}, linger {}", flowAction,
+ dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
+ }
+ // Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
+ ResultSet rs = withPreparedStatement(
+
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
this.leaseArbiterTableName,
+ this.leaseArbiterTableName, this.constantsTableName),
+ updateStatement -> {
+ completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
+ return updateStatement.executeQuery();
+ }, true);
+ return handleResultFromAttemptedLeaseObtainment(rs, eventTimeMillis);
+ } // No longer leasing this event
+ // CASE 5: Same event, no longer leasing event in db: terminate
+ if (isWithinEpsilon) {
+ return new NoLongerLeasingStatus();
+ }
+ // CASE 6: Distinct event, no longer leasing event in db
+ // Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
+ ResultSet rs = withPreparedStatement(
+
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
this.leaseArbiterTableName,
+ this.leaseArbiterTableName, this.constantsTableName),
+ updateStatement -> {
+ completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ false, dbEventTimestamp, null);
+ return updateStatement.executeQuery();
+ }, true);
+ return handleResultFromAttemptedLeaseObtainment(rs, eventTimeMillis);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Attempt lease by insert or update following a read based on the condition
the state of the table has not changed
+ * since the read. Parse the result to return the corresponding status based
on successful insert/update or not.
+ * @param resultSet
+ * @param eventTimeMillis
+ * @return LeaseAttemptStatus
+ * @throws SQLException
+ * @throws IOException
+ */
+ protected LeaseAttemptStatus
handleResultFromAttemptedLeaseObtainment(ResultSet resultSet, long
eventTimeMillis)
+ throws SQLException, IOException {
+ if (!resultSet.next()) {
+ throw new IOException("Expected num rows and lease_acquisition_timestamp
returned from query but received nothing");
+ }
+ int numRowsUpdated = resultSet.getInt(1);
+ long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
Review Comment:
couldn't this be `null`, in which case `NoLongerLeasingStatus`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
+import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
+ * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
+ * previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
+ * cases.
+ */
+public class SchedulerLeaseAlgoHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
+ private final int staggerUpperBoundSec;
+ private static Random random = new Random();
+ protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+ protected JobScheduler jobScheduler;
+ protected SchedulerService schedulerService;
+ protected DagActionStore dagActionStore;
+ private MetricContext metricContext;
+ private ContextAwareMeter numLeasesCompleted;
+ @Inject
+ public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ JobScheduler jobScheduler, SchedulerService schedulerService,
DagActionStore dagActionStore) {
+ this.staggerUpperBoundSec = ConfigUtils.getInt(config,
+ ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+ this.multiActiveLeaseArbiter = leaseDeterminationStore;
+ this.jobScheduler = jobScheduler;
+ this.schedulerService = schedulerService;
+ this.dagActionStore = dagActionStore;
+ this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+ this.getClass());
+ this.numLeasesCompleted =
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED);
+ }
+
+ /**
+ * This method is used in the multi-active scheduler case for one or more
hosts to respond to a flow action event
+ * by attempting a lease for the flow event and processing the result
depending on the status of the attempt.
+ * @param jobProps
+ * @param flowAction
+ * @param eventTimeMillis
+ * @throws IOException
+ */
+ public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ throws IOException {
+ LeaseAttemptStatus leaseAttemptStatus =
+ multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
+ // TODO: add a log event or metric for each of these cases
+ switch (leaseAttemptStatus.getClass().getSimpleName()) {
+ case "LeaseObtainedStatus":
+ finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
+ break;
+ case "LeasedToAnotherStatus":
+ scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
+ break;
+ case "NoLongerLeasingStatus":
+ break;
+ default:
+ }
+ }
+
+ // Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
+ private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ try {
+ this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
+ flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
+ if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
+ flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
Review Comment:
just wondering: why can't `addDagAction` provide a guarantee that it will
have succeeded (or else throws an exception). i.e. why the need to immediately
check `exists` to confirm?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
+import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
+ * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
+ * previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
+ * cases.
+ */
+public class SchedulerLeaseAlgoHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
+ private final int staggerUpperBoundSec;
+ private static Random random = new Random();
+ protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+ protected JobScheduler jobScheduler;
+ protected SchedulerService schedulerService;
+ protected DagActionStore dagActionStore;
+ private MetricContext metricContext;
+ private ContextAwareMeter numLeasesCompleted;
+ @Inject
+ public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ JobScheduler jobScheduler, SchedulerService schedulerService,
DagActionStore dagActionStore) {
+ this.staggerUpperBoundSec = ConfigUtils.getInt(config,
+ ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+ this.multiActiveLeaseArbiter = leaseDeterminationStore;
+ this.jobScheduler = jobScheduler;
+ this.schedulerService = schedulerService;
+ this.dagActionStore = dagActionStore;
+ this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+ this.getClass());
+ this.numLeasesCompleted =
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED);
+ }
+
+ /**
+ * This method is used in the multi-active scheduler case for one or more
hosts to respond to a flow action event
+ * by attempting a lease for the flow event and processing the result
depending on the status of the attempt.
+ * @param jobProps
+ * @param flowAction
+ * @param eventTimeMillis
+ * @throws IOException
+ */
+ public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ throws IOException {
+ LeaseAttemptStatus leaseAttemptStatus =
+ multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
+ // TODO: add a log event or metric for each of these cases
+ switch (leaseAttemptStatus.getClass().getSimpleName()) {
+ case "LeaseObtainedStatus":
+ finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
+ break;
+ case "LeasedToAnotherStatus":
+ scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
+ break;
+ case "NoLongerLeasingStatus":
+ break;
+ default:
+ }
+ }
+
+ // Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
+ private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
Review Comment:
name this `launchFlow`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
+import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
+ * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
+ * previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
+ * cases.
+ */
+public class SchedulerLeaseAlgoHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
+ private final int staggerUpperBoundSec;
+ private static Random random = new Random();
+ protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+ protected JobScheduler jobScheduler;
+ protected SchedulerService schedulerService;
+ protected DagActionStore dagActionStore;
+ private MetricContext metricContext;
+ private ContextAwareMeter numLeasesCompleted;
+ @Inject
+ public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ JobScheduler jobScheduler, SchedulerService schedulerService,
DagActionStore dagActionStore) {
+ this.staggerUpperBoundSec = ConfigUtils.getInt(config,
+ ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+ this.multiActiveLeaseArbiter = leaseDeterminationStore;
+ this.jobScheduler = jobScheduler;
+ this.schedulerService = schedulerService;
+ this.dagActionStore = dagActionStore;
+ this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+ this.getClass());
+ this.numLeasesCompleted =
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED);
+ }
+
+ /**
+ * This method is used in the multi-active scheduler case for one or more
hosts to respond to a flow action event
+ * by attempting a lease for the flow event and processing the result
depending on the status of the attempt.
+ * @param jobProps
+ * @param flowAction
+ * @param eventTimeMillis
+ * @throws IOException
+ */
+ public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ throws IOException {
+ LeaseAttemptStatus leaseAttemptStatus =
+ multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
+ // TODO: add a log event or metric for each of these cases
+ switch (leaseAttemptStatus.getClass().getSimpleName()) {
+ case "LeaseObtainedStatus":
+ finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
+ break;
+ case "LeasedToAnotherStatus":
+ scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
+ break;
+ case "NoLongerLeasingStatus":
+ break;
+ default:
+ }
+ }
+
+ // Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
+ private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ try {
+ this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
+ flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
+ if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
+ flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
+ status.getMyLeaseAcquisitionTimestamp());
+ }
+ } catch (IOException | SQLException e) {
+ throw new RuntimeException(e);
+ }
+ // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
+ return false;
+ }
+
+ /**
+ * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
+ * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
+ * expire.
+ * @param jobProps
+ * @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
+ * @param originalEventTimeMillis the event timestamp we were originally
handling
+ * @param flowAction
+ */
+ private void scheduleReminderForEvent(Properties jobProps,
LeasedToAnotherStatus status,
+ DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
+ // Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
+ String cronExpression =
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() +
random.nextInt(staggerUpperBoundSec));
+ jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
+ // Ensure we save the event timestamp that we're setting reminder for, in
addition to our own event timestamp which may be different
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
+ JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
+ Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
Review Comment:
sorry, I guess I'm unfamiliar: what's this `Trigger` we get from one
scheduler and give to another?
Issue Time Tracking
-------------------
Worklog Id: (was: 864378)
Time Spent: 9.5h (was: 9h 20m)
> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
> Key: GOBBLIN-1837
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 9.5h
> Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active
> scheduler for each host. It will NOT include metric emission or unit tests
> for validation. That will be done in a separate follow-up ticket. The work in
> this ticket includes
> * define a table to do scheduler lease determination for each flow's trigger
> event and related methods to execute actions on this tableĀ
> * update DagActionStore schema and DagActionStoreMonitor to act upon new
> "LAUNCH" type events in addition to KILL/RESUME
> * update scheduler/orchestrator logic to apply the non-blocking algorithm
> when "multi-active scheduler mode" is enabled, otherwise submit events
> directly to the DagManager after receiving a scheduler trigger
> * implement the non-blocking algorithm, particularly handling reminder
> events if another host is in the process of securing the lease for a
> particular flow trigger
--
This message was sent by Atlassian Jira
(v8.20.10#820010)