phet commented on code in PR #3700: URL: https://github.com/apache/gobblin/pull/3700#discussion_r1211180809
########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import javax.sql.DataSource; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +public class MysqlSchedulerLeaseDeterminationStore implements SchedulerLeaseDeterminationStore { + public static final String CONFIG_PREFIX = "MysqlSchedulerLeaseDeterminationStore"; + + protected final DataSource dataSource; + private final DagActionStore dagActionStore; + private final String tableName; + private final long epsilon; + private final long linger; Review Comment: suggest naming these w/ a unit suffix, like `Millis`, etc. ... but wait! shouldn't these be values stored in the DB and referenced from queries, rather than held in java-space, where there's no guarantee they would always be exactly the same for all hosts? it is all but impossible to debug a distributed algo, if the participants are permitted to "drift", and thereby reach inconsistent conclusions about the same inputs at a given point in time. ########## gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java: ########## @@ -95,6 +95,24 @@ public class ConfigurationKeys { public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500; public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = "skip.scheduling.flows.after.num.days"; public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365; + // Scheduler lease determination store configuration + // TODO: multiActiveScheduler change here update values for the following keys and rename to more meaningful + public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_JDBC_DRIVER_KEY = "state.store.db.jdbc.driver"; + public static final String DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"; Review Comment: this constant is already in a few other places. could we reference one here rather than repeating? e.g. `DEFAULT_STATE_STORE_DB_JDBC_DRIVER` ########## gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java: ########## @@ -95,6 +95,24 @@ public class ConfigurationKeys { public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500; public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS = "skip.scheduling.flows.after.num.days"; public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365; + // Scheduler lease determination store configuration + // TODO: multiActiveScheduler change here update values for the following keys and rename to more meaningful + public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_JDBC_DRIVER_KEY = "state.store.db.jdbc.driver"; + public static final String DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_JDBC_DRIVER = "com.mysql.cj.jdbc.Driver"; + public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_URL_KEY = "state.store.db.url"; + public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_USER_KEY = "state.store.db.user"; + public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_PASSWORD_KEY = "state.store.db.password"; + public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY = "state.store.db.table"; Review Comment: is it intentional that these would "borrow" the same key as others, like: ``` public static final String STATE_STORE_DB_URL_KEY = "state.store.db.url"; public static final String STATE_STORE_DB_USER_KEY = "state.store.db.user"; ``` (just 20 or so lines above)? doing so seems very confusing ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import javax.sql.DataSource; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +public class MysqlSchedulerLeaseDeterminationStore implements SchedulerLeaseDeterminationStore { + public static final String CONFIG_PREFIX = "MysqlSchedulerLeaseDeterminationStore"; + + protected final DataSource dataSource; + private final DagActionStore dagActionStore; + private final String tableName; + private final long epsilon; + private final long linger; + /* TODO: + - define retention on this table + - initialize table with epsilon and linger if one already doesn't exist using these configs + - join with table above to ensure epsilon/linger values are consistent across hosts (in case hosts are deployed with different configs) + */ + protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=? " + + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s"; + protected static final String ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s (flow_group, " + + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS (" + + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT ROW_COUNT() AS rows_inserted_count, " + + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW; + + protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE %s SET pursuant_timestamp = NULL " + + WHERE_CLAUSE_TO_MATCH_ROW; + private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %S (" + "flow_group varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "flow_execution_id varchar(" + + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, flow_action varchar(100) NOT NULL, " + + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP," + + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action,trigger_event_timestamp)"; Review Comment: why should `trigger_event_timestamp` be part of the primary key? rather it seems an attribute of the record identified by the other four fields. ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import javax.sql.DataSource; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +public class MysqlSchedulerLeaseDeterminationStore implements SchedulerLeaseDeterminationStore { + public static final String CONFIG_PREFIX = "MysqlSchedulerLeaseDeterminationStore"; + + protected final DataSource dataSource; + private final DagActionStore dagActionStore; + private final String tableName; + private final long epsilon; + private final long linger; + /* TODO: + - define retention on this table + - initialize table with epsilon and linger if one already doesn't exist using these configs + - join with table above to ensure epsilon/linger values are consistent across hosts (in case hosts are deployed with different configs) + */ + protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=? " + + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s"; + protected static final String ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s (flow_group, " + + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS (" + + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT ROW_COUNT() AS rows_inserted_count, " + + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW; + + protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE %s SET pursuant_timestamp = NULL " + + WHERE_CLAUSE_TO_MATCH_ROW; + private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %S (" + "flow_group varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "flow_execution_id varchar(" + + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, flow_action varchar(100) NOT NULL, " + + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " Review Comment: nit: there's no reason to limit this impl to being about solely trigger events. (it could be instead for resume events). hence I suggest: naming simply `event_timestamp` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import javax.sql.DataSource; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +public class MysqlSchedulerLeaseDeterminationStore implements SchedulerLeaseDeterminationStore { + public static final String CONFIG_PREFIX = "MysqlSchedulerLeaseDeterminationStore"; + + protected final DataSource dataSource; + private final DagActionStore dagActionStore; + private final String tableName; + private final long epsilon; + private final long linger; + /* TODO: + - define retention on this table + - initialize table with epsilon and linger if one already doesn't exist using these configs + - join with table above to ensure epsilon/linger values are consistent across hosts (in case hosts are deployed with different configs) + */ + protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=? " + + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s"; + protected static final String ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s (flow_group, " + + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS (" + + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT ROW_COUNT() AS rows_inserted_count, " + + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW; Review Comment: is there a missing close parens here? I'm having trouble mentally parsing... ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import javax.sql.DataSource; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +public class MysqlSchedulerLeaseDeterminationStore implements SchedulerLeaseDeterminationStore { + public static final String CONFIG_PREFIX = "MysqlSchedulerLeaseDeterminationStore"; + + protected final DataSource dataSource; + private final DagActionStore dagActionStore; + private final String tableName; + private final long epsilon; + private final long linger; + /* TODO: + - define retention on this table + - initialize table with epsilon and linger if one already doesn't exist using these configs + - join with table above to ensure epsilon/linger values are consistent across hosts (in case hosts are deployed with different configs) + */ + protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=? " + + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s"; + protected static final String ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s (flow_group, " + + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS (" + + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT ROW_COUNT() AS rows_inserted_count, " + + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW; + + protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE %s SET pursuant_timestamp = NULL " + + WHERE_CLAUSE_TO_MATCH_ROW; + private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %S (" + "flow_group varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "flow_execution_id varchar(" + + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, flow_action varchar(100) NOT NULL, " + + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP," Review Comment: as mentioned, 'pursuing' is likely to be clearer and more familiar than 'pursuant'. `lease_timestamp` or `lease_acquisition_timestamp` are other possibilities. ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java: ########## @@ -91,21 +89,20 @@ public UpdateResponse delete(ComplexResourceKey<org.apache.gobblin.service.FlowS String flowName = key.getKey().getFlowName(); Long flowExecutionId = key.getKey().getFlowExecutionId(); try { - // If an existing resume or kill request is still pending then do not accept this request - if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId.toString())) { - DagActionStore.DagActionValue action = this.dagActionStore.getDagAction(flowGroup, flowName, flowExecutionId.toString()).getDagActionValue(); - this.handleException(flowGroup, flowName, flowExecutionId.toString(), - new RuntimeException("There is already a pending " + action + " action for this flow. Please wait to resubmit and wait for" + // If an existing kill request is still pending then do not accept this request + if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.DagActionValue.KILL)) { + this.handleException(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.DagActionValue.KILL, + new RuntimeException("There is already a pending KILL action for this flow. Please wait to resubmit and wait for" Review Comment: feels like déjà vu... I believe I just read this code above, but w/ `s/RESUME/KILL/g`. if so, can't we DRY it up? ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import javax.sql.DataSource; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +public class MysqlSchedulerLeaseDeterminationStore implements SchedulerLeaseDeterminationStore { + public static final String CONFIG_PREFIX = "MysqlSchedulerLeaseDeterminationStore"; + + protected final DataSource dataSource; + private final DagActionStore dagActionStore; + private final String tableName; + private final long epsilon; + private final long linger; + /* TODO: + - define retention on this table + - initialize table with epsilon and linger if one already doesn't exist using these configs + - join with table above to ensure epsilon/linger values are consistent across hosts (in case hosts are deployed with different configs) + */ + protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=? " + + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s"; Review Comment: this doesn't necessarily pinpoint a row, so much as constrain what could be multiple rows. I suggest leaving it as only the first four comparisons, which are all exact equality, and leaving off the `ABS`/`<=` relative cmp here. the latter can be added in the few instances where the specific value of `trigger_event_timestamp` is unknown. ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import javax.sql.DataSource; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +public class MysqlSchedulerLeaseDeterminationStore implements SchedulerLeaseDeterminationStore { + public static final String CONFIG_PREFIX = "MysqlSchedulerLeaseDeterminationStore"; + + protected final DataSource dataSource; + private final DagActionStore dagActionStore; + private final String tableName; + private final long epsilon; + private final long linger; + /* TODO: + - define retention on this table + - initialize table with epsilon and linger if one already doesn't exist using these configs + - join with table above to ensure epsilon/linger values are consistent across hosts (in case hosts are deployed with different configs) + */ + protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=? " + + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s"; + protected static final String ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s (flow_group, " + + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS (" + + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT ROW_COUNT() AS rows_inserted_count, " + + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW; + + protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE %s SET pursuant_timestamp = NULL " + + WHERE_CLAUSE_TO_MATCH_ROW; + private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %S (" + "flow_group varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "flow_execution_id varchar(" + + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, flow_action varchar(100) NOT NULL, " + + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP," + + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action,trigger_event_timestamp)"; + + @Inject + public MysqlSchedulerLeaseDeterminationStore(Config config, DagActionStore dagActionStore) throws IOException { + if (config.hasPath(CONFIG_PREFIX)) { + config = config.getConfig(CONFIG_PREFIX).withFallback(config); + } else { + throw new IOException("Please specify the config for MysqlSchedulerLeaseDeterminationStore"); + } + + this.tableName = ConfigUtils.getString(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE); + this.epsilon = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS); + this.linger = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS); + + this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); + try (Connection connection = dataSource.getConnection(); + PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { + createStatement.executeUpdate(); + connection.commit(); + } catch (SQLException e) { + throw new IOException("Table creation failure for " + tableName, e); + } + this.dagActionStore = dagActionStore; + } + + @Override + public LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String flowGroup, String flowName, + String flowExecutionId, FlowActionType flowActionType, long triggerTimeMillis) + throws IOException { + Timestamp triggerTimestamp = new Timestamp(triggerTimeMillis); + try (Connection connection = this.dataSource.getConnection(); + PreparedStatement insertStatement = connection.prepareStatement( + String.format(ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT, tableName, tableName, epsilon, tableName, + epsilon))) { + int i = 0; + // Values to set in new row + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + // Values to check if existing row matches + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + // Values to make select statement to read row + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); Review Comment: I'm in favor of raising the level of abstraction here. no need necessarily to follow the pattern of `completeInsertPreparedStatement`, but that is one way to do so - https://github.com/apache/gobblin/blob/51a852d506b749b9ac33568aff47105e14972a57/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java#L112 ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import javax.sql.DataSource; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +public class MysqlSchedulerLeaseDeterminationStore implements SchedulerLeaseDeterminationStore { + public static final String CONFIG_PREFIX = "MysqlSchedulerLeaseDeterminationStore"; + + protected final DataSource dataSource; + private final DagActionStore dagActionStore; + private final String tableName; + private final long epsilon; + private final long linger; + /* TODO: + - define retention on this table + - initialize table with epsilon and linger if one already doesn't exist using these configs + - join with table above to ensure epsilon/linger values are consistent across hosts (in case hosts are deployed with different configs) + */ + protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=? " + + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s"; + protected static final String ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s (flow_group, " + + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS (" + + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT ROW_COUNT() AS rows_inserted_count, " + + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW; + + protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE %s SET pursuant_timestamp = NULL " + + WHERE_CLAUSE_TO_MATCH_ROW; + private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %S (" + "flow_group varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "flow_execution_id varchar(" + + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, flow_action varchar(100) NOT NULL, " + + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP," + + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action,trigger_event_timestamp)"; + + @Inject + public MysqlSchedulerLeaseDeterminationStore(Config config, DagActionStore dagActionStore) throws IOException { + if (config.hasPath(CONFIG_PREFIX)) { + config = config.getConfig(CONFIG_PREFIX).withFallback(config); + } else { + throw new IOException("Please specify the config for MysqlSchedulerLeaseDeterminationStore"); + } + + this.tableName = ConfigUtils.getString(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE); + this.epsilon = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS); + this.linger = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS); + + this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); + try (Connection connection = dataSource.getConnection(); + PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { + createStatement.executeUpdate(); + connection.commit(); + } catch (SQLException e) { + throw new IOException("Table creation failure for " + tableName, e); + } + this.dagActionStore = dagActionStore; + } + + @Override + public LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String flowGroup, String flowName, + String flowExecutionId, FlowActionType flowActionType, long triggerTimeMillis) + throws IOException { + Timestamp triggerTimestamp = new Timestamp(triggerTimeMillis); + try (Connection connection = this.dataSource.getConnection(); + PreparedStatement insertStatement = connection.prepareStatement( + String.format(ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT, tableName, tableName, epsilon, tableName, + epsilon))) { + int i = 0; + // Values to set in new row + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + // Values to check if existing row matches + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + // Values to make select statement to read row + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + ResultSet resultSet = insertStatement.executeQuery(); + connection.commit(); + + if (!resultSet.next()) { + resultSet.close(); + throw new IOException(String.format("Unexpected error where no result returned while trying to obtain lease. " + + "This error indicates that no entry existed for trigger flow event for table %s flow group: %s, flow " + + "name: %s flow execution id: %s and trigger timestamp: %s when one should have been inserted", + tableName, flowGroup, flowName, flowExecutionId, triggerTimestamp)); + } + // If a row was inserted, then we have obtained the lease + int rowsUpdated = resultSet.getInt(1); + if (rowsUpdated == 1) { + // If the pursuing flow launch has been persisted to the {@link DagActionStore} we have completed lease obtainment + this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.LAUNCH); + if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.LAUNCH)) { + if (updatePursuantTimestamp(flowGroup, flowName, flowExecutionId, flowActionType, triggerTimestamp)) { + // TODO: potentially add metric here to count number of flows scheduled by each scheduler + LOG.info("Host completed obtaining lease for flow group: %s, flow name: %s flow execution id: %s and " + + "trigger timestamp: %s", flowGroup, flowName, flowExecutionId, triggerTimestamp); + resultSet.close(); + return LeaseAttemptStatus.LEASE_OBTAINED; + } else { + LOG.warn("Unable to update pursuant timestamp after persisting flow launch to DagActionStore for flow " + + "group: %s, flow name: %s flow execution id: %s and trigger timestamp: %s.", flowGroup, flowName, + flowExecutionId, triggerTimestamp); + } + } else { + LOG.warn("Did not find flow launch action in DagActionStore after adding it for flow group: %s, flow name: " + + "%s flow execution id: %s and trigger timestamp: %s.", flowGroup, flowName, flowExecutionId, + triggerTimestamp); + } + } else if (rowsUpdated > 1) { + resultSet.close(); + throw new IOException(String.format("Expect at most 1 row in table for a given trigger event. %s rows " + + "exist for the trigger flow event for table %s flow group: %s, flow name: %s flow execution id: %s " + + "and trigger timestamp: %s.", i, tableName, flowGroup, flowName, flowExecutionId, triggerTimestamp)); + } + Timestamp pursuantTimestamp = resultSet.getTimestamp(2); + resultSet.close(); + long currentTimeMillis = System.currentTimeMillis(); + // Another host has obtained lease and no further steps required + if (pursuantTimestamp == null) { + LOG.info("Another host has already successfully obtained lease for flow group: %s, flow name: %s flow execution " + + "id: %s and trigger timestamp: %s", flowGroup, flowName, flowExecutionId, triggerTimeMillis); + return LeaseAttemptStatus.LEASE_OBTAINED; + } else if (pursuantTimestamp.getTime() + linger <= currentTimeMillis) { + return LeaseAttemptStatus.PREVIOUS_LEASE_EXPIRED; + } + // Previous lease owner still has valid lease (pursuant + linger > current timestamp) + return LeaseAttemptStatus.PREVIOUS_LEASE_VALID; + } catch (SQLException e) { + throw new IOException(String.format("Error encountered while trying to obtain lease on trigger flow event for " + + "table %s flow group: %s, flow name: %s flow execution id: %s and trigger timestamp: %s", tableName, + flowGroup, flowName, flowExecutionId, triggerTimestamp), e); + } + } + + @Override + public boolean updatePursuantTimestamp(String flowGroup, String flowName, String flowExecutionId, + FlowActionType flowActionType, Timestamp triggerTimestamp) + throws IOException { + try (Connection connection = this.dataSource.getConnection(); + PreparedStatement updateStatement = connection.prepareStatement( + String.format(UPDATE_PURSUANT_TIMESTAMP_STATEMENT, tableName, epsilon))) { + int i = 0; + updateStatement.setString(++i, flowGroup); + updateStatement.setString(++i, flowName); + updateStatement.setString(++i, flowExecutionId); + updateStatement.setString(++i, flowActionType.toString()); + updateStatement.setTimestamp(++i, triggerTimestamp); + i = updateStatement.executeUpdate(); + connection.commit(); + + if (i != 1) { + LOG.warn("Expected to update 1 row's pursuant timestamp for a flow trigger event but instead updated {}", i); + } + return i >= 1; + } catch (SQLException e) { + throw new IOException(String.format("Encountered exception while trying to update pursuant timestamp to null for " + + "flowGroup: %s flowName: %s flowExecutionId: %s flowAction: %s triggerTimestamp: %s. Exception is %s", + flowGroup, flowName, flowExecutionId, flowActionType, triggerTimestamp, e)); Review Comment: to preserve the stacktrace of the orig exception, we'd want the two-arg version of `IOException` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import javax.sql.DataSource; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +public class MysqlSchedulerLeaseDeterminationStore implements SchedulerLeaseDeterminationStore { + public static final String CONFIG_PREFIX = "MysqlSchedulerLeaseDeterminationStore"; + + protected final DataSource dataSource; + private final DagActionStore dagActionStore; + private final String tableName; + private final long epsilon; + private final long linger; + /* TODO: + - define retention on this table + - initialize table with epsilon and linger if one already doesn't exist using these configs + - join with table above to ensure epsilon/linger values are consistent across hosts (in case hosts are deployed with different configs) + */ + protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=? " + + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s"; + protected static final String ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s (flow_group, " + + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS (" + + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT ROW_COUNT() AS rows_inserted_count, " + + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW; + + protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE %s SET pursuant_timestamp = NULL " + + WHERE_CLAUSE_TO_MATCH_ROW; + private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %S (" + "flow_group varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "flow_execution_id varchar(" + + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, flow_action varchar(100) NOT NULL, " + + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP," + + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action,trigger_event_timestamp)"; + + @Inject + public MysqlSchedulerLeaseDeterminationStore(Config config, DagActionStore dagActionStore) throws IOException { + if (config.hasPath(CONFIG_PREFIX)) { + config = config.getConfig(CONFIG_PREFIX).withFallback(config); + } else { + throw new IOException("Please specify the config for MysqlSchedulerLeaseDeterminationStore"); + } + + this.tableName = ConfigUtils.getString(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE); + this.epsilon = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS); + this.linger = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS); + + this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); + try (Connection connection = dataSource.getConnection(); + PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { + createStatement.executeUpdate(); + connection.commit(); + } catch (SQLException e) { + throw new IOException("Table creation failure for " + tableName, e); + } + this.dagActionStore = dagActionStore; + } + + @Override + public LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String flowGroup, String flowName, + String flowExecutionId, FlowActionType flowActionType, long triggerTimeMillis) + throws IOException { + Timestamp triggerTimestamp = new Timestamp(triggerTimeMillis); + try (Connection connection = this.dataSource.getConnection(); + PreparedStatement insertStatement = connection.prepareStatement( + String.format(ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT, tableName, tableName, epsilon, tableName, + epsilon))) { + int i = 0; + // Values to set in new row + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + // Values to check if existing row matches + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + // Values to make select statement to read row + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + ResultSet resultSet = insertStatement.executeQuery(); + connection.commit(); + + if (!resultSet.next()) { + resultSet.close(); + throw new IOException(String.format("Unexpected error where no result returned while trying to obtain lease. " + + "This error indicates that no entry existed for trigger flow event for table %s flow group: %s, flow " + + "name: %s flow execution id: %s and trigger timestamp: %s when one should have been inserted", + tableName, flowGroup, flowName, flowExecutionId, triggerTimestamp)); + } + // If a row was inserted, then we have obtained the lease + int rowsUpdated = resultSet.getInt(1); + if (rowsUpdated == 1) { + // If the pursuing flow launch has been persisted to the {@link DagActionStore} we have completed lease obtainment + this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.LAUNCH); + if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.LAUNCH)) { Review Comment: not certain I haven't gotten confused, but... is the code here jumping in to carry out its task immediately upon obtaining the lease? if so, I recommend instead to separate concerns and have the present class solely determine lease status. leave it instead to the caller to separately act on the situation where the lease was successfully acquired. not only would code be more reusable, but also easier to test in isolation (i.e. w/o mocking `DagActionStore`). ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import javax.sql.DataSource; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +public class MysqlSchedulerLeaseDeterminationStore implements SchedulerLeaseDeterminationStore { + public static final String CONFIG_PREFIX = "MysqlSchedulerLeaseDeterminationStore"; + + protected final DataSource dataSource; + private final DagActionStore dagActionStore; + private final String tableName; + private final long epsilon; + private final long linger; + /* TODO: + - define retention on this table + - initialize table with epsilon and linger if one already doesn't exist using these configs + - join with table above to ensure epsilon/linger values are consistent across hosts (in case hosts are deployed with different configs) + */ + protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=? " + + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s"; + protected static final String ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s (flow_group, " + + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS (" + + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT ROW_COUNT() AS rows_inserted_count, " + + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW; + + protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE %s SET pursuant_timestamp = NULL " + + WHERE_CLAUSE_TO_MATCH_ROW; + private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %S (" + "flow_group varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "flow_execution_id varchar(" + + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, flow_action varchar(100) NOT NULL, " + + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP," + + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action,trigger_event_timestamp)"; + + @Inject + public MysqlSchedulerLeaseDeterminationStore(Config config, DagActionStore dagActionStore) throws IOException { + if (config.hasPath(CONFIG_PREFIX)) { + config = config.getConfig(CONFIG_PREFIX).withFallback(config); + } else { + throw new IOException("Please specify the config for MysqlSchedulerLeaseDeterminationStore"); + } + + this.tableName = ConfigUtils.getString(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE); + this.epsilon = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS); + this.linger = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS); + + this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); + try (Connection connection = dataSource.getConnection(); + PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { + createStatement.executeUpdate(); + connection.commit(); + } catch (SQLException e) { + throw new IOException("Table creation failure for " + tableName, e); + } + this.dagActionStore = dagActionStore; + } + + @Override + public LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String flowGroup, String flowName, + String flowExecutionId, FlowActionType flowActionType, long triggerTimeMillis) + throws IOException { + Timestamp triggerTimestamp = new Timestamp(triggerTimeMillis); + try (Connection connection = this.dataSource.getConnection(); + PreparedStatement insertStatement = connection.prepareStatement( + String.format(ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT, tableName, tableName, epsilon, tableName, + epsilon))) { + int i = 0; + // Values to set in new row + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + // Values to check if existing row matches + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + // Values to make select statement to read row + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + ResultSet resultSet = insertStatement.executeQuery(); + connection.commit(); + + if (!resultSet.next()) { + resultSet.close(); + throw new IOException(String.format("Unexpected error where no result returned while trying to obtain lease. " + + "This error indicates that no entry existed for trigger flow event for table %s flow group: %s, flow " + + "name: %s flow execution id: %s and trigger timestamp: %s when one should have been inserted", + tableName, flowGroup, flowName, flowExecutionId, triggerTimestamp)); + } + // If a row was inserted, then we have obtained the lease + int rowsUpdated = resultSet.getInt(1); + if (rowsUpdated == 1) { + // If the pursuing flow launch has been persisted to the {@link DagActionStore} we have completed lease obtainment + this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.LAUNCH); + if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.LAUNCH)) { + if (updatePursuantTimestamp(flowGroup, flowName, flowExecutionId, flowActionType, triggerTimestamp)) { + // TODO: potentially add metric here to count number of flows scheduled by each scheduler + LOG.info("Host completed obtaining lease for flow group: %s, flow name: %s flow execution id: %s and " + + "trigger timestamp: %s", flowGroup, flowName, flowExecutionId, triggerTimestamp); + resultSet.close(); + return LeaseAttemptStatus.LEASE_OBTAINED; + } else { + LOG.warn("Unable to update pursuant timestamp after persisting flow launch to DagActionStore for flow " + + "group: %s, flow name: %s flow execution id: %s and trigger timestamp: %s.", flowGroup, flowName, + flowExecutionId, triggerTimestamp); + } + } else { + LOG.warn("Did not find flow launch action in DagActionStore after adding it for flow group: %s, flow name: " + + "%s flow execution id: %s and trigger timestamp: %s.", flowGroup, flowName, flowExecutionId, + triggerTimestamp); + } + } else if (rowsUpdated > 1) { + resultSet.close(); + throw new IOException(String.format("Expect at most 1 row in table for a given trigger event. %s rows " + + "exist for the trigger flow event for table %s flow group: %s, flow name: %s flow execution id: %s " + + "and trigger timestamp: %s.", i, tableName, flowGroup, flowName, flowExecutionId, triggerTimestamp)); + } + Timestamp pursuantTimestamp = resultSet.getTimestamp(2); + resultSet.close(); + long currentTimeMillis = System.currentTimeMillis(); + // Another host has obtained lease and no further steps required + if (pursuantTimestamp == null) { + LOG.info("Another host has already successfully obtained lease for flow group: %s, flow name: %s flow execution " + + "id: %s and trigger timestamp: %s", flowGroup, flowName, flowExecutionId, triggerTimeMillis); + return LeaseAttemptStatus.LEASE_OBTAINED; + } else if (pursuantTimestamp.getTime() + linger <= currentTimeMillis) { + return LeaseAttemptStatus.PREVIOUS_LEASE_EXPIRED; + } + // Previous lease owner still has valid lease (pursuant + linger > current timestamp) + return LeaseAttemptStatus.PREVIOUS_LEASE_VALID; + } catch (SQLException e) { + throw new IOException(String.format("Error encountered while trying to obtain lease on trigger flow event for " + + "table %s flow group: %s, flow name: %s flow execution id: %s and trigger timestamp: %s", tableName, + flowGroup, flowName, flowExecutionId, triggerTimestamp), e); + } + } + + @Override + public boolean updatePursuantTimestamp(String flowGroup, String flowName, String flowExecutionId, + FlowActionType flowActionType, Timestamp triggerTimestamp) + throws IOException { + try (Connection connection = this.dataSource.getConnection(); + PreparedStatement updateStatement = connection.prepareStatement( + String.format(UPDATE_PURSUANT_TIMESTAMP_STATEMENT, tableName, epsilon))) { + int i = 0; + updateStatement.setString(++i, flowGroup); + updateStatement.setString(++i, flowName); + updateStatement.setString(++i, flowExecutionId); + updateStatement.setString(++i, flowActionType.toString()); + updateStatement.setTimestamp(++i, triggerTimestamp); + i = updateStatement.executeUpdate(); Review Comment: quite confusing to repurpose the var used for the index during statement preparation to later become the count of rows modified. why not just create a separate `final` var dedicated to the latter purpose? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java: ########## @@ -54,27 +53,26 @@ public void resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E String flowName = key.getKey().getFlowName(); Long flowExecutionId = key.getKey().getFlowExecutionId(); try { - // If an existing resume or kill request is still pending then do not accept this request - if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId.toString())) { - DagActionStore.DagActionValue action = this.dagActionStore.getDagAction(flowGroup, flowName, flowExecutionId.toString()).getDagActionValue(); - this.handleException(flowGroup, flowName, flowExecutionId.toString(), - new RuntimeException("There is already a pending action " + action + " for this flow. Please wait to resubmit and wait for" + // If an existing resume request is still pending then do not accept this request + if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME)) { + this.handleException(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME, + new RuntimeException("There is already a pending RESUME action for this flow. Please wait to resubmit and wait for" + " action to be completed.")); return; } this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME); - } catch (IOException | SQLException | SpecNotFoundException e) { + } catch (IOException | SQLException e) { log.warn( String.format("Failed to add execution resume action for flow %s %s %s to dag action store due to", flowGroup, flowName, flowExecutionId), e); - this.handleException(flowGroup, flowName, flowExecutionId.toString(), e); + this.handleException(flowGroup, flowName, flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME, e); } } - private void handleException (String flowGroup, String flowName, String flowExecutionId, Exception e) { + private void handleException (String flowGroup, String flowName, String flowExecutionId, DagActionStore.DagActionValue dagActionValue, Exception e) { Review Comment: I'm not clear whether I've missed something, but is `Exception e` used merely to `e.getMessage()`? if so, why are we passing in an exception, which generates a stacktrace, etc., rather than merely passing in a `String errMessage`? ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java: ########## @@ -119,28 +158,8 @@ protected void processMessage(DecodeableKafkaRecord message) { return; } - // Retrieve the Dag Action taken from MySQL table unless operation is DELETE - DagActionStore.DagActionValue dagAction = null; - if (!operation.equals("DELETE")) { - try { - dagAction = dagActionStore.getDagAction(flowGroup, flowName, flowExecutionId).getDagActionValue(); - } catch (IOException e) { - log.error("Encountered IOException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, e); - this.unexpectedErrors.mark(); - return; - } catch (SpecNotFoundException e) { - log.error("DagAction not found for flow group: {} name: {} executionId: {} Exception: {}", flowGroup, flowName, - flowExecutionId, e); - this.unexpectedErrors.mark(); - return; - } catch (SQLException throwables) { - log.error("Encountered SQLException trying to retrieve dagAction for flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, flowName, flowExecutionId, throwables); - return; - } - } - - // We only expert INSERT and DELETE operations done to this table. INSERTs correspond to resume or delete flow - // requests that have to be processed. DELETEs require no action. + // We only expect INSERT and DELETE operations done to this table. INSERTs correspond to any type of + // {@link DagActionStore.DagACtionValue} flow requests that have to be processed. DELETEs require no action. Review Comment: DagA(c)tionValue ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java: ########## @@ -151,7 +170,16 @@ protected void processMessage(DecodeableKafkaRecord message) { log.info("Received insert dag action and about to send kill flow request"); dagManager.handleKillFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId)); this.killsInvoked.mark(); - } else { + } else if (dagAction.equals(DagActionStore.DagActionValue.LAUNCH)) { + // If multi-active scheduler is NOT turned on we should not receive these type of events + if (!this.isMultiActiveSchedulerEnabled) { + log.warn("Received LAUNCH dagAction while not in multi-active scheduler mode for flow group: {}, flow name:" + + "{}, execution id: {}, dagAction: {}", flowGroup, flowName, flowExecutionId, dagAction); + this.unexpectedErrors.mark(); + } + log.info("Received insert dag action and about to forward launch request to DagManager"); + submitFlowToDagManager(flowGroup, flowName); + }else { log.warn("Received unsupported dagAction {}. Expected to be a KILL or RESUME", dagAction); Review Comment: ...or `LAUNCH` ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java: ########## @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; + +import com.google.inject.Inject; +import com.typesafe.config.Config; + +import javax.sql.DataSource; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metastore.MysqlDataSourceFactory; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + + +public class MysqlSchedulerLeaseDeterminationStore implements SchedulerLeaseDeterminationStore { + public static final String CONFIG_PREFIX = "MysqlSchedulerLeaseDeterminationStore"; + + protected final DataSource dataSource; + private final DagActionStore dagActionStore; + private final String tableName; + private final long epsilon; + private final long linger; + /* TODO: + - define retention on this table + - initialize table with epsilon and linger if one already doesn't exist using these configs + - join with table above to ensure epsilon/linger values are consistent across hosts (in case hosts are deployed with different configs) + */ + protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=? " + + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s"; + protected static final String ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s (flow_group, " + + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp) VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS (" + + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT ROW_COUNT() AS rows_inserted_count, " + + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW; + + protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE %s SET pursuant_timestamp = NULL " + + WHERE_CLAUSE_TO_MATCH_ROW; + private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %S (" + "flow_group varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name varchar(" + + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + "flow_execution_id varchar(" + + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL, flow_action varchar(100) NOT NULL, " + + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, " + + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP," + + "PRIMARY KEY (flow_group,flow_name,flow_execution_id,flow_action,trigger_event_timestamp)"; + + @Inject + public MysqlSchedulerLeaseDeterminationStore(Config config, DagActionStore dagActionStore) throws IOException { + if (config.hasPath(CONFIG_PREFIX)) { + config = config.getConfig(CONFIG_PREFIX).withFallback(config); + } else { + throw new IOException("Please specify the config for MysqlSchedulerLeaseDeterminationStore"); + } + + this.tableName = ConfigUtils.getString(config, ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE); + this.epsilon = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS); + this.linger = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS); + + this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); + try (Connection connection = dataSource.getConnection(); + PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { + createStatement.executeUpdate(); + connection.commit(); + } catch (SQLException e) { + throw new IOException("Table creation failure for " + tableName, e); + } + this.dagActionStore = dagActionStore; + } + + @Override + public LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String flowGroup, String flowName, + String flowExecutionId, FlowActionType flowActionType, long triggerTimeMillis) + throws IOException { + Timestamp triggerTimestamp = new Timestamp(triggerTimeMillis); + try (Connection connection = this.dataSource.getConnection(); + PreparedStatement insertStatement = connection.prepareStatement( + String.format(ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT, tableName, tableName, epsilon, tableName, + epsilon))) { + int i = 0; + // Values to set in new row + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + // Values to check if existing row matches + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + // Values to make select statement to read row + insertStatement.setString(++i, flowGroup); + insertStatement.setString(++i, flowName); + insertStatement.setString(++i, flowExecutionId); + insertStatement.setString(++i, flowActionType.toString()); + insertStatement.setTimestamp(++i, triggerTimestamp); + ResultSet resultSet = insertStatement.executeQuery(); + connection.commit(); + + if (!resultSet.next()) { + resultSet.close(); + throw new IOException(String.format("Unexpected error where no result returned while trying to obtain lease. " + + "This error indicates that no entry existed for trigger flow event for table %s flow group: %s, flow " + + "name: %s flow execution id: %s and trigger timestamp: %s when one should have been inserted", + tableName, flowGroup, flowName, flowExecutionId, triggerTimestamp)); + } + // If a row was inserted, then we have obtained the lease + int rowsUpdated = resultSet.getInt(1); + if (rowsUpdated == 1) { + // If the pursuing flow launch has been persisted to the {@link DagActionStore} we have completed lease obtainment + this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.LAUNCH); + if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId, DagActionStore.DagActionValue.LAUNCH)) { + if (updatePursuantTimestamp(flowGroup, flowName, flowExecutionId, flowActionType, triggerTimestamp)) { + // TODO: potentially add metric here to count number of flows scheduled by each scheduler + LOG.info("Host completed obtaining lease for flow group: %s, flow name: %s flow execution id: %s and " + + "trigger timestamp: %s", flowGroup, flowName, flowExecutionId, triggerTimestamp); + resultSet.close(); + return LeaseAttemptStatus.LEASE_OBTAINED; + } else { + LOG.warn("Unable to update pursuant timestamp after persisting flow launch to DagActionStore for flow " + + "group: %s, flow name: %s flow execution id: %s and trigger timestamp: %s.", flowGroup, flowName, + flowExecutionId, triggerTimestamp); + } + } else { + LOG.warn("Did not find flow launch action in DagActionStore after adding it for flow group: %s, flow name: " + + "%s flow execution id: %s and trigger timestamp: %s.", flowGroup, flowName, flowExecutionId, + triggerTimestamp); + } + } else if (rowsUpdated > 1) { + resultSet.close(); + throw new IOException(String.format("Expect at most 1 row in table for a given trigger event. %s rows " + + "exist for the trigger flow event for table %s flow group: %s, flow name: %s flow execution id: %s " + + "and trigger timestamp: %s.", i, tableName, flowGroup, flowName, flowExecutionId, triggerTimestamp)); + } + Timestamp pursuantTimestamp = resultSet.getTimestamp(2); + resultSet.close(); + long currentTimeMillis = System.currentTimeMillis(); + // Another host has obtained lease and no further steps required + if (pursuantTimestamp == null) { + LOG.info("Another host has already successfully obtained lease for flow group: %s, flow name: %s flow execution " + + "id: %s and trigger timestamp: %s", flowGroup, flowName, flowExecutionId, triggerTimeMillis); + return LeaseAttemptStatus.LEASE_OBTAINED; + } else if (pursuantTimestamp.getTime() + linger <= currentTimeMillis) { + return LeaseAttemptStatus.PREVIOUS_LEASE_EXPIRED; + } + // Previous lease owner still has valid lease (pursuant + linger > current timestamp) + return LeaseAttemptStatus.PREVIOUS_LEASE_VALID; + } catch (SQLException e) { + throw new IOException(String.format("Error encountered while trying to obtain lease on trigger flow event for " + + "table %s flow group: %s, flow name: %s flow execution id: %s and trigger timestamp: %s", tableName, + flowGroup, flowName, flowExecutionId, triggerTimestamp), e); + } + } + + @Override + public boolean updatePursuantTimestamp(String flowGroup, String flowName, String flowExecutionId, + FlowActionType flowActionType, Timestamp triggerTimestamp) + throws IOException { + try (Connection connection = this.dataSource.getConnection(); + PreparedStatement updateStatement = connection.prepareStatement( Review Comment: perhaps consider reining in boiler plate w/ something like `getPreparedStatement` - https://github.com/apache/gobblin/blob/51a852d506b749b9ac33568aff47105e14972a57/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java#L356 ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.Locale; +import java.util.Properties; +import java.util.Random; + +import org.quartz.JobKey; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.typesafe.config.Config; + +import javax.inject.Inject; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.SchedulerLeaseDeterminationStore; +import org.apache.gobblin.scheduler.JobScheduler; +import org.apache.gobblin.scheduler.SchedulerService; +import org.apache.gobblin.util.ConfigUtils; + + +public class SchedulerLeaseAlgoHandler { + private static final Logger LOG = LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class); + private final long linger; + private final int staggerUpperBoundSec; + private static Random random = new Random(); + protected SchedulerLeaseDeterminationStore leaseDeterminationStore; + protected JobScheduler jobScheduler; + protected SchedulerService schedulerService; + @Inject + public SchedulerLeaseAlgoHandler(Config config, SchedulerLeaseDeterminationStore leaseDeterminationStore, + JobScheduler jobScheduler, SchedulerService schedulerService) + throws IOException { + this.linger = ConfigUtils.getLong(config, ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS); + this.staggerUpperBoundSec = ConfigUtils.getInt(config, + ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY, + ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC); + this.leaseDeterminationStore = leaseDeterminationStore; + this.jobScheduler = jobScheduler; + this.schedulerService = schedulerService; + } + private SchedulerLeaseDeterminationStore schedulerLeaseDeterminationStore; + + /** + * This method is used in the multi-active scheduler case for one or more hosts to respond to a flow's trigger event + * by attempting a lease for the flow event. + * @param jobProps + * @param flowGroup + * @param flowName + * @param flowExecutionId + * @param flowActionType + * @param triggerTimeMillis + * @return true if this host obtained the lease for this flow's trigger event, false otherwise. + * @throws IOException + */ + public boolean handleNewTriggerEvent(Properties jobProps, String flowGroup, String flowName, String flowExecutionId, + SchedulerLeaseDeterminationStore.FlowActionType flowActionType, long triggerTimeMillis) + throws IOException { + SchedulerLeaseDeterminationStore.LeaseAttemptStatus leaseAttemptStatus = + schedulerLeaseDeterminationStore.attemptInsertAndGetPursuantTimestamp(flowGroup, flowName, flowExecutionId, + flowActionType, triggerTimeMillis); + // TODO: add a log event or metric for each of these cases + switch (leaseAttemptStatus) { + case LEASE_OBTAINED: + return true; + case PREVIOUS_LEASE_EXPIRED: + // recursively try obtaining lease again immediately, stops when reaches one of the other cases + return handleNewTriggerEvent(jobProps, flowGroup, flowName, flowExecutionId, flowActionType, triggerTimeMillis); Review Comment: see comment elsewhere... this should not happen ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java: ########## @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.util.Locale; +import java.util.Properties; +import java.util.Random; + +import org.quartz.JobKey; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.typesafe.config.Config; + +import javax.inject.Inject; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.SchedulerLeaseDeterminationStore; +import org.apache.gobblin.scheduler.JobScheduler; +import org.apache.gobblin.scheduler.SchedulerService; +import org.apache.gobblin.util.ConfigUtils; + + +public class SchedulerLeaseAlgoHandler { Review Comment: all classes deserve javadoc ########## gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java: ########## @@ -151,7 +170,16 @@ protected void processMessage(DecodeableKafkaRecord message) { log.info("Received insert dag action and about to send kill flow request"); dagManager.handleKillFlowRequest(flowGroup, flowName, Long.parseLong(flowExecutionId)); this.killsInvoked.mark(); - } else { + } else if (dagAction.equals(DagActionStore.DagActionValue.LAUNCH)) { + // If multi-active scheduler is NOT turned on we should not receive these type of events + if (!this.isMultiActiveSchedulerEnabled) { + log.warn("Received LAUNCH dagAction while not in multi-active scheduler mode for flow group: {}, flow name:" + + "{}, execution id: {}, dagAction: {}", flowGroup, flowName, flowExecutionId, dagAction); + this.unexpectedErrors.mark(); + } + log.info("Received insert dag action and about to forward launch request to DagManager"); + submitFlowToDagManager(flowGroup, flowName); Review Comment: so even if not enabled for such behavior, we still proceed to submit this flow? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
