[
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=862947&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-862947
]
ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 31/May/23 08:30
Start Date: 31/May/23 08:30
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1211180809
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlSchedulerLeaseDeterminationStore implements
SchedulerLeaseDeterminationStore {
+ public static final String CONFIG_PREFIX =
"MysqlSchedulerLeaseDeterminationStore";
+
+ protected final DataSource dataSource;
+ private final DagActionStore dagActionStore;
+ private final String tableName;
+ private final long epsilon;
+ private final long linger;
Review Comment:
suggest naming these w/ a unit suffix, like `Millis`, etc.
... but wait! shouldn't these be values stored in the DB and referenced
from queries, rather than held in java-space, where there's no guarantee they
would always be exactly the same for all hosts?
it is all but impossible to debug a distributed algo, if the participants
are permitted to "drift", and thereby reach inconsistent conclusions about the
same inputs at a given point in time.
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,24 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+ // Scheduler lease determination store configuration
+ // TODO: multiActiveScheduler change here update values for the following
keys and rename to more meaningful
+ public static final String
SCHEDULER_LEASE_DETERMINATION_STORE_DB_JDBC_DRIVER_KEY =
"state.store.db.jdbc.driver";
+ public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_JDBC_DRIVER =
"com.mysql.cj.jdbc.Driver";
Review Comment:
this constant is already in a few other places. could we reference one here
rather than repeating? e.g. `DEFAULT_STATE_STORE_DB_JDBC_DRIVER`
##########
gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java:
##########
@@ -95,6 +95,24 @@ public class ConfigurationKeys {
public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 500;
public static final String SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS =
"skip.scheduling.flows.after.num.days";
public static final int DEFAULT_NUM_DAYS_TO_SKIP_AFTER = 365;
+ // Scheduler lease determination store configuration
+ // TODO: multiActiveScheduler change here update values for the following
keys and rename to more meaningful
+ public static final String
SCHEDULER_LEASE_DETERMINATION_STORE_DB_JDBC_DRIVER_KEY =
"state.store.db.jdbc.driver";
+ public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_JDBC_DRIVER =
"com.mysql.cj.jdbc.Driver";
+ public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_URL_KEY =
"state.store.db.url";
+ public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_USER_KEY =
"state.store.db.user";
+ public static final String
SCHEDULER_LEASE_DETERMINATION_STORE_DB_PASSWORD_KEY = "state.store.db.password";
+ public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= "state.store.db.table";
Review Comment:
is it intentional that these would "borrow" the same key as others, like:
```
public static final String STATE_STORE_DB_URL_KEY = "state.store.db.url";
public static final String STATE_STORE_DB_USER_KEY = "state.store.db.user";
```
(just 20 or so lines above)?
doing so seems very confusing
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlSchedulerLeaseDeterminationStore implements
SchedulerLeaseDeterminationStore {
+ public static final String CONFIG_PREFIX =
"MysqlSchedulerLeaseDeterminationStore";
+
+ protected final DataSource dataSource;
+ private final DagActionStore dagActionStore;
+ private final String tableName;
+ private final long epsilon;
+ private final long linger;
+ /* TODO:
+ - define retention on this table
+ - initialize table with epsilon and linger if one already doesn't exist
using these configs
+ - join with table above to ensure epsilon/linger values are consistent
across hosts (in case hosts are deployed with different configs)
+ */
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=? "
+ + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s";
+ protected static final String
ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s
(flow_group, "
+ + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp)
VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS ("
+ + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE
%s SET pursuant_timestamp = NULL "
+ + WHERE_CLAUSE_TO_MATCH_ROW;
+ private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT
EXISTS %S (" + "flow_group varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name
varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
+ + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "PRIMARY KEY
(flow_group,flow_name,flow_execution_id,flow_action,trigger_event_timestamp)";
Review Comment:
why should `trigger_event_timestamp` be part of the primary key? rather it
seems an attribute of the record identified by the other four fields.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlSchedulerLeaseDeterminationStore implements
SchedulerLeaseDeterminationStore {
+ public static final String CONFIG_PREFIX =
"MysqlSchedulerLeaseDeterminationStore";
+
+ protected final DataSource dataSource;
+ private final DagActionStore dagActionStore;
+ private final String tableName;
+ private final long epsilon;
+ private final long linger;
+ /* TODO:
+ - define retention on this table
+ - initialize table with epsilon and linger if one already doesn't exist
using these configs
+ - join with table above to ensure epsilon/linger values are consistent
across hosts (in case hosts are deployed with different configs)
+ */
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=? "
+ + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s";
+ protected static final String
ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s
(flow_group, "
+ + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp)
VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS ("
+ + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE
%s SET pursuant_timestamp = NULL "
+ + WHERE_CLAUSE_TO_MATCH_ROW;
+ private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT
EXISTS %S (" + "flow_group varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name
varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
Review Comment:
nit: there's no reason to limit this impl to being about solely trigger
events. (it could be instead for resume events). hence I suggest: naming
simply `event_timestamp`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlSchedulerLeaseDeterminationStore implements
SchedulerLeaseDeterminationStore {
+ public static final String CONFIG_PREFIX =
"MysqlSchedulerLeaseDeterminationStore";
+
+ protected final DataSource dataSource;
+ private final DagActionStore dagActionStore;
+ private final String tableName;
+ private final long epsilon;
+ private final long linger;
+ /* TODO:
+ - define retention on this table
+ - initialize table with epsilon and linger if one already doesn't exist
using these configs
+ - join with table above to ensure epsilon/linger values are consistent
across hosts (in case hosts are deployed with different configs)
+ */
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=? "
+ + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s";
+ protected static final String
ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s
(flow_group, "
+ + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp)
VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS ("
+ + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW;
Review Comment:
is there a missing close parens here? I'm having trouble mentally parsing...
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlSchedulerLeaseDeterminationStore implements
SchedulerLeaseDeterminationStore {
+ public static final String CONFIG_PREFIX =
"MysqlSchedulerLeaseDeterminationStore";
+
+ protected final DataSource dataSource;
+ private final DagActionStore dagActionStore;
+ private final String tableName;
+ private final long epsilon;
+ private final long linger;
+ /* TODO:
+ - define retention on this table
+ - initialize table with epsilon and linger if one already doesn't exist
using these configs
+ - join with table above to ensure epsilon/linger values are consistent
across hosts (in case hosts are deployed with different configs)
+ */
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=? "
+ + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s";
+ protected static final String
ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s
(flow_group, "
+ + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp)
VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS ("
+ + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE
%s SET pursuant_timestamp = NULL "
+ + WHERE_CLAUSE_TO_MATCH_ROW;
+ private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT
EXISTS %S (" + "flow_group varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name
varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
+ + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
Review Comment:
as mentioned, 'pursuing' is likely to be clearer and more familiar than
'pursuant'. `lease_timestamp` or `lease_acquisition_timestamp` are other
possibilities.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -91,21 +89,20 @@ public UpdateResponse
delete(ComplexResourceKey<org.apache.gobblin.service.FlowS
String flowName = key.getKey().getFlowName();
Long flowExecutionId = key.getKey().getFlowExecutionId();
try {
- // If an existing resume or kill request is still pending then do not
accept this request
- if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString())) {
- DagActionStore.DagActionValue action =
this.dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId.toString()).getDagActionValue();
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
- new RuntimeException("There is already a pending " + action + "
action for this flow. Please wait to resubmit and wait for"
+ // If an existing kill request is still pending then do not accept this
request
+ if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.DagActionValue.KILL)) {
+ this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.DagActionValue.KILL,
+ new RuntimeException("There is already a pending KILL action for
this flow. Please wait to resubmit and wait for"
Review Comment:
feels like déjà vu... I believe I just read this code above, but w/
`s/RESUME/KILL/g`. if so, can't we DRY it up?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlSchedulerLeaseDeterminationStore implements
SchedulerLeaseDeterminationStore {
+ public static final String CONFIG_PREFIX =
"MysqlSchedulerLeaseDeterminationStore";
+
+ protected final DataSource dataSource;
+ private final DagActionStore dagActionStore;
+ private final String tableName;
+ private final long epsilon;
+ private final long linger;
+ /* TODO:
+ - define retention on this table
+ - initialize table with epsilon and linger if one already doesn't exist
using these configs
+ - join with table above to ensure epsilon/linger values are consistent
across hosts (in case hosts are deployed with different configs)
+ */
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=? "
+ + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s";
Review Comment:
this doesn't necessarily pinpoint a row, so much as constrain what could be
multiple rows. I suggest leaving it as only the first four comparisons, which
are all exact equality, and leaving off the `ABS`/`<=` relative cmp here. the
latter can be added in the few instances where the specific value of
`trigger_event_timestamp` is unknown.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlSchedulerLeaseDeterminationStore implements
SchedulerLeaseDeterminationStore {
+ public static final String CONFIG_PREFIX =
"MysqlSchedulerLeaseDeterminationStore";
+
+ protected final DataSource dataSource;
+ private final DagActionStore dagActionStore;
+ private final String tableName;
+ private final long epsilon;
+ private final long linger;
+ /* TODO:
+ - define retention on this table
+ - initialize table with epsilon and linger if one already doesn't exist
using these configs
+ - join with table above to ensure epsilon/linger values are consistent
across hosts (in case hosts are deployed with different configs)
+ */
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=? "
+ + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s";
+ protected static final String
ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s
(flow_group, "
+ + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp)
VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS ("
+ + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE
%s SET pursuant_timestamp = NULL "
+ + WHERE_CLAUSE_TO_MATCH_ROW;
+ private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT
EXISTS %S (" + "flow_group varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name
varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
+ + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "PRIMARY KEY
(flow_group,flow_name,flow_execution_id,flow_action,trigger_event_timestamp)";
+
+ @Inject
+ public MysqlSchedulerLeaseDeterminationStore(Config config, DagActionStore
dagActionStore) throws IOException {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ } else {
+ throw new IOException("Please specify the config for
MysqlSchedulerLeaseDeterminationStore");
+ }
+
+ this.tableName = ConfigUtils.getString(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
+ this.epsilon = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS);
+ this.linger = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS);
+
+ this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement createStatement =
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
+ createStatement.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Table creation failure for " + tableName, e);
+ }
+ this.dagActionStore = dagActionStore;
+ }
+
+ @Override
+ public LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String
flowGroup, String flowName,
+ String flowExecutionId, FlowActionType flowActionType, long
triggerTimeMillis)
+ throws IOException {
+ Timestamp triggerTimestamp = new Timestamp(triggerTimeMillis);
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement insertStatement = connection.prepareStatement(
+ String.format(ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT,
tableName, tableName, epsilon, tableName,
+ epsilon))) {
+ int i = 0;
+ // Values to set in new row
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ // Values to check if existing row matches
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ // Values to make select statement to read row
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
Review Comment:
I'm in favor of raising the level of abstraction here. no need necessarily
to follow the pattern of `completeInsertPreparedStatement`, but that is one way
to do so -
https://github.com/apache/gobblin/blob/51a852d506b749b9ac33568aff47105e14972a57/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java#L112
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlSchedulerLeaseDeterminationStore implements
SchedulerLeaseDeterminationStore {
+ public static final String CONFIG_PREFIX =
"MysqlSchedulerLeaseDeterminationStore";
+
+ protected final DataSource dataSource;
+ private final DagActionStore dagActionStore;
+ private final String tableName;
+ private final long epsilon;
+ private final long linger;
+ /* TODO:
+ - define retention on this table
+ - initialize table with epsilon and linger if one already doesn't exist
using these configs
+ - join with table above to ensure epsilon/linger values are consistent
across hosts (in case hosts are deployed with different configs)
+ */
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=? "
+ + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s";
+ protected static final String
ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s
(flow_group, "
+ + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp)
VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS ("
+ + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE
%s SET pursuant_timestamp = NULL "
+ + WHERE_CLAUSE_TO_MATCH_ROW;
+ private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT
EXISTS %S (" + "flow_group varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name
varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
+ + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "PRIMARY KEY
(flow_group,flow_name,flow_execution_id,flow_action,trigger_event_timestamp)";
+
+ @Inject
+ public MysqlSchedulerLeaseDeterminationStore(Config config, DagActionStore
dagActionStore) throws IOException {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ } else {
+ throw new IOException("Please specify the config for
MysqlSchedulerLeaseDeterminationStore");
+ }
+
+ this.tableName = ConfigUtils.getString(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
+ this.epsilon = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS);
+ this.linger = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS);
+
+ this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement createStatement =
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
+ createStatement.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Table creation failure for " + tableName, e);
+ }
+ this.dagActionStore = dagActionStore;
+ }
+
+ @Override
+ public LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String
flowGroup, String flowName,
+ String flowExecutionId, FlowActionType flowActionType, long
triggerTimeMillis)
+ throws IOException {
+ Timestamp triggerTimestamp = new Timestamp(triggerTimeMillis);
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement insertStatement = connection.prepareStatement(
+ String.format(ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT,
tableName, tableName, epsilon, tableName,
+ epsilon))) {
+ int i = 0;
+ // Values to set in new row
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ // Values to check if existing row matches
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ // Values to make select statement to read row
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ ResultSet resultSet = insertStatement.executeQuery();
+ connection.commit();
+
+ if (!resultSet.next()) {
+ resultSet.close();
+ throw new IOException(String.format("Unexpected error where no result
returned while trying to obtain lease. "
+ + "This error indicates that no entry existed for trigger flow
event for table %s flow group: %s, flow "
+ + "name: %s flow execution id: %s and trigger timestamp: %s
when one should have been inserted",
+ tableName, flowGroup, flowName, flowExecutionId,
triggerTimestamp));
+ }
+ // If a row was inserted, then we have obtained the lease
+ int rowsUpdated = resultSet.getInt(1);
+ if (rowsUpdated == 1) {
+ // If the pursuing flow launch has been persisted to the {@link
DagActionStore} we have completed lease obtainment
+ this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionValue.LAUNCH);
+ if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionValue.LAUNCH)) {
+ if (updatePursuantTimestamp(flowGroup, flowName, flowExecutionId,
flowActionType, triggerTimestamp)) {
+ // TODO: potentially add metric here to count number of flows
scheduled by each scheduler
+ LOG.info("Host completed obtaining lease for flow group: %s, flow
name: %s flow execution id: %s and "
+ + "trigger timestamp: %s", flowGroup, flowName,
flowExecutionId, triggerTimestamp);
+ resultSet.close();
+ return LeaseAttemptStatus.LEASE_OBTAINED;
+ } else {
+ LOG.warn("Unable to update pursuant timestamp after persisting
flow launch to DagActionStore for flow "
+ + "group: %s, flow name: %s flow execution id: %s and trigger
timestamp: %s.", flowGroup, flowName,
+ flowExecutionId, triggerTimestamp);
+ }
+ } else {
+ LOG.warn("Did not find flow launch action in DagActionStore after
adding it for flow group: %s, flow name: "
+ + "%s flow execution id: %s and trigger timestamp: %s.",
flowGroup, flowName, flowExecutionId,
+ triggerTimestamp);
+ }
+ } else if (rowsUpdated > 1) {
+ resultSet.close();
+ throw new IOException(String.format("Expect at most 1 row in table for
a given trigger event. %s rows "
+ + "exist for the trigger flow event for table %s flow group: %s,
flow name: %s flow execution id: %s "
+ + "and trigger timestamp: %s.", i, tableName, flowGroup, flowName,
flowExecutionId, triggerTimestamp));
+ }
+ Timestamp pursuantTimestamp = resultSet.getTimestamp(2);
+ resultSet.close();
+ long currentTimeMillis = System.currentTimeMillis();
+ // Another host has obtained lease and no further steps required
+ if (pursuantTimestamp == null) {
+ LOG.info("Another host has already successfully obtained lease for
flow group: %s, flow name: %s flow execution "
+ + "id: %s and trigger timestamp: %s", flowGroup, flowName,
flowExecutionId, triggerTimeMillis);
+ return LeaseAttemptStatus.LEASE_OBTAINED;
+ } else if (pursuantTimestamp.getTime() + linger <= currentTimeMillis) {
+ return LeaseAttemptStatus.PREVIOUS_LEASE_EXPIRED;
+ }
+ // Previous lease owner still has valid lease (pursuant + linger >
current timestamp)
+ return LeaseAttemptStatus.PREVIOUS_LEASE_VALID;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Error encountered while trying to
obtain lease on trigger flow event for "
+ + "table %s flow group: %s, flow name: %s flow execution id: %s
and trigger timestamp: %s", tableName,
+ flowGroup, flowName, flowExecutionId, triggerTimestamp), e);
+ }
+ }
+
+ @Override
+ public boolean updatePursuantTimestamp(String flowGroup, String flowName,
String flowExecutionId,
+ FlowActionType flowActionType, Timestamp triggerTimestamp)
+ throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement updateStatement = connection.prepareStatement(
+ String.format(UPDATE_PURSUANT_TIMESTAMP_STATEMENT, tableName,
epsilon))) {
+ int i = 0;
+ updateStatement.setString(++i, flowGroup);
+ updateStatement.setString(++i, flowName);
+ updateStatement.setString(++i, flowExecutionId);
+ updateStatement.setString(++i, flowActionType.toString());
+ updateStatement.setTimestamp(++i, triggerTimestamp);
+ i = updateStatement.executeUpdate();
+ connection.commit();
+
+ if (i != 1) {
+ LOG.warn("Expected to update 1 row's pursuant timestamp for a flow
trigger event but instead updated {}", i);
+ }
+ return i >= 1;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Encountered exception while trying
to update pursuant timestamp to null for "
+ + "flowGroup: %s flowName: %s flowExecutionId: %s flowAction: %s
triggerTimestamp: %s. Exception is %s",
+ flowGroup, flowName, flowExecutionId, flowActionType,
triggerTimestamp, e));
Review Comment:
to preserve the stacktrace of the orig exception, we'd want the two-arg
version of `IOException`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlSchedulerLeaseDeterminationStore implements
SchedulerLeaseDeterminationStore {
+ public static final String CONFIG_PREFIX =
"MysqlSchedulerLeaseDeterminationStore";
+
+ protected final DataSource dataSource;
+ private final DagActionStore dagActionStore;
+ private final String tableName;
+ private final long epsilon;
+ private final long linger;
+ /* TODO:
+ - define retention on this table
+ - initialize table with epsilon and linger if one already doesn't exist
using these configs
+ - join with table above to ensure epsilon/linger values are consistent
across hosts (in case hosts are deployed with different configs)
+ */
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=? "
+ + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s";
+ protected static final String
ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s
(flow_group, "
+ + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp)
VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS ("
+ + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE
%s SET pursuant_timestamp = NULL "
+ + WHERE_CLAUSE_TO_MATCH_ROW;
+ private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT
EXISTS %S (" + "flow_group varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name
varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
+ + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "PRIMARY KEY
(flow_group,flow_name,flow_execution_id,flow_action,trigger_event_timestamp)";
+
+ @Inject
+ public MysqlSchedulerLeaseDeterminationStore(Config config, DagActionStore
dagActionStore) throws IOException {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ } else {
+ throw new IOException("Please specify the config for
MysqlSchedulerLeaseDeterminationStore");
+ }
+
+ this.tableName = ConfigUtils.getString(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
+ this.epsilon = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS);
+ this.linger = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS);
+
+ this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement createStatement =
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
+ createStatement.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Table creation failure for " + tableName, e);
+ }
+ this.dagActionStore = dagActionStore;
+ }
+
+ @Override
+ public LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String
flowGroup, String flowName,
+ String flowExecutionId, FlowActionType flowActionType, long
triggerTimeMillis)
+ throws IOException {
+ Timestamp triggerTimestamp = new Timestamp(triggerTimeMillis);
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement insertStatement = connection.prepareStatement(
+ String.format(ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT,
tableName, tableName, epsilon, tableName,
+ epsilon))) {
+ int i = 0;
+ // Values to set in new row
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ // Values to check if existing row matches
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ // Values to make select statement to read row
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ ResultSet resultSet = insertStatement.executeQuery();
+ connection.commit();
+
+ if (!resultSet.next()) {
+ resultSet.close();
+ throw new IOException(String.format("Unexpected error where no result
returned while trying to obtain lease. "
+ + "This error indicates that no entry existed for trigger flow
event for table %s flow group: %s, flow "
+ + "name: %s flow execution id: %s and trigger timestamp: %s
when one should have been inserted",
+ tableName, flowGroup, flowName, flowExecutionId,
triggerTimestamp));
+ }
+ // If a row was inserted, then we have obtained the lease
+ int rowsUpdated = resultSet.getInt(1);
+ if (rowsUpdated == 1) {
+ // If the pursuing flow launch has been persisted to the {@link
DagActionStore} we have completed lease obtainment
+ this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionValue.LAUNCH);
+ if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionValue.LAUNCH)) {
Review Comment:
not certain I haven't gotten confused, but...
is the code here jumping in to carry out its task immediately upon obtaining
the lease? if so, I recommend instead to separate concerns and have the
present class solely determine lease status. leave it instead to the caller to
separately act on the situation where the lease was successfully acquired. not
only would code be more reusable, but also easier to test in isolation (i.e.
w/o mocking `DagActionStore`).
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlSchedulerLeaseDeterminationStore implements
SchedulerLeaseDeterminationStore {
+ public static final String CONFIG_PREFIX =
"MysqlSchedulerLeaseDeterminationStore";
+
+ protected final DataSource dataSource;
+ private final DagActionStore dagActionStore;
+ private final String tableName;
+ private final long epsilon;
+ private final long linger;
+ /* TODO:
+ - define retention on this table
+ - initialize table with epsilon and linger if one already doesn't exist
using these configs
+ - join with table above to ensure epsilon/linger values are consistent
across hosts (in case hosts are deployed with different configs)
+ */
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=? "
+ + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s";
+ protected static final String
ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s
(flow_group, "
+ + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp)
VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS ("
+ + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE
%s SET pursuant_timestamp = NULL "
+ + WHERE_CLAUSE_TO_MATCH_ROW;
+ private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT
EXISTS %S (" + "flow_group varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name
varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
+ + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "PRIMARY KEY
(flow_group,flow_name,flow_execution_id,flow_action,trigger_event_timestamp)";
+
+ @Inject
+ public MysqlSchedulerLeaseDeterminationStore(Config config, DagActionStore
dagActionStore) throws IOException {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ } else {
+ throw new IOException("Please specify the config for
MysqlSchedulerLeaseDeterminationStore");
+ }
+
+ this.tableName = ConfigUtils.getString(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
+ this.epsilon = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS);
+ this.linger = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS);
+
+ this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement createStatement =
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
+ createStatement.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Table creation failure for " + tableName, e);
+ }
+ this.dagActionStore = dagActionStore;
+ }
+
+ @Override
+ public LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String
flowGroup, String flowName,
+ String flowExecutionId, FlowActionType flowActionType, long
triggerTimeMillis)
+ throws IOException {
+ Timestamp triggerTimestamp = new Timestamp(triggerTimeMillis);
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement insertStatement = connection.prepareStatement(
+ String.format(ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT,
tableName, tableName, epsilon, tableName,
+ epsilon))) {
+ int i = 0;
+ // Values to set in new row
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ // Values to check if existing row matches
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ // Values to make select statement to read row
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ ResultSet resultSet = insertStatement.executeQuery();
+ connection.commit();
+
+ if (!resultSet.next()) {
+ resultSet.close();
+ throw new IOException(String.format("Unexpected error where no result
returned while trying to obtain lease. "
+ + "This error indicates that no entry existed for trigger flow
event for table %s flow group: %s, flow "
+ + "name: %s flow execution id: %s and trigger timestamp: %s
when one should have been inserted",
+ tableName, flowGroup, flowName, flowExecutionId,
triggerTimestamp));
+ }
+ // If a row was inserted, then we have obtained the lease
+ int rowsUpdated = resultSet.getInt(1);
+ if (rowsUpdated == 1) {
+ // If the pursuing flow launch has been persisted to the {@link
DagActionStore} we have completed lease obtainment
+ this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionValue.LAUNCH);
+ if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionValue.LAUNCH)) {
+ if (updatePursuantTimestamp(flowGroup, flowName, flowExecutionId,
flowActionType, triggerTimestamp)) {
+ // TODO: potentially add metric here to count number of flows
scheduled by each scheduler
+ LOG.info("Host completed obtaining lease for flow group: %s, flow
name: %s flow execution id: %s and "
+ + "trigger timestamp: %s", flowGroup, flowName,
flowExecutionId, triggerTimestamp);
+ resultSet.close();
+ return LeaseAttemptStatus.LEASE_OBTAINED;
+ } else {
+ LOG.warn("Unable to update pursuant timestamp after persisting
flow launch to DagActionStore for flow "
+ + "group: %s, flow name: %s flow execution id: %s and trigger
timestamp: %s.", flowGroup, flowName,
+ flowExecutionId, triggerTimestamp);
+ }
+ } else {
+ LOG.warn("Did not find flow launch action in DagActionStore after
adding it for flow group: %s, flow name: "
+ + "%s flow execution id: %s and trigger timestamp: %s.",
flowGroup, flowName, flowExecutionId,
+ triggerTimestamp);
+ }
+ } else if (rowsUpdated > 1) {
+ resultSet.close();
+ throw new IOException(String.format("Expect at most 1 row in table for
a given trigger event. %s rows "
+ + "exist for the trigger flow event for table %s flow group: %s,
flow name: %s flow execution id: %s "
+ + "and trigger timestamp: %s.", i, tableName, flowGroup, flowName,
flowExecutionId, triggerTimestamp));
+ }
+ Timestamp pursuantTimestamp = resultSet.getTimestamp(2);
+ resultSet.close();
+ long currentTimeMillis = System.currentTimeMillis();
+ // Another host has obtained lease and no further steps required
+ if (pursuantTimestamp == null) {
+ LOG.info("Another host has already successfully obtained lease for
flow group: %s, flow name: %s flow execution "
+ + "id: %s and trigger timestamp: %s", flowGroup, flowName,
flowExecutionId, triggerTimeMillis);
+ return LeaseAttemptStatus.LEASE_OBTAINED;
+ } else if (pursuantTimestamp.getTime() + linger <= currentTimeMillis) {
+ return LeaseAttemptStatus.PREVIOUS_LEASE_EXPIRED;
+ }
+ // Previous lease owner still has valid lease (pursuant + linger >
current timestamp)
+ return LeaseAttemptStatus.PREVIOUS_LEASE_VALID;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Error encountered while trying to
obtain lease on trigger flow event for "
+ + "table %s flow group: %s, flow name: %s flow execution id: %s
and trigger timestamp: %s", tableName,
+ flowGroup, flowName, flowExecutionId, triggerTimestamp), e);
+ }
+ }
+
+ @Override
+ public boolean updatePursuantTimestamp(String flowGroup, String flowName,
String flowExecutionId,
+ FlowActionType flowActionType, Timestamp triggerTimestamp)
+ throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement updateStatement = connection.prepareStatement(
+ String.format(UPDATE_PURSUANT_TIMESTAMP_STATEMENT, tableName,
epsilon))) {
+ int i = 0;
+ updateStatement.setString(++i, flowGroup);
+ updateStatement.setString(++i, flowName);
+ updateStatement.setString(++i, flowExecutionId);
+ updateStatement.setString(++i, flowActionType.toString());
+ updateStatement.setTimestamp(++i, triggerTimestamp);
+ i = updateStatement.executeUpdate();
Review Comment:
quite confusing to repurpose the var used for the index during statement
preparation to later become the count of rows modified. why not just create a
separate `final` var dedicated to the latter purpose?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandlerWithWarmStandby.java:
##########
@@ -54,27 +53,26 @@ public void
resume(ComplexResourceKey<org.apache.gobblin.service.FlowStatusId, E
String flowName = key.getKey().getFlowName();
Long flowExecutionId = key.getKey().getFlowExecutionId();
try {
- // If an existing resume or kill request is still pending then do not
accept this request
- if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString())) {
- DagActionStore.DagActionValue action =
this.dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId.toString()).getDagActionValue();
- this.handleException(flowGroup, flowName, flowExecutionId.toString(),
- new RuntimeException("There is already a pending action " + action
+ " for this flow. Please wait to resubmit and wait for"
+ // If an existing resume request is still pending then do not accept
this request
+ if (this.dagActionStore.exists(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME)) {
+ this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.DagActionValue.RESUME,
+ new RuntimeException("There is already a pending RESUME action for
this flow. Please wait to resubmit and wait for"
+ " action to be completed."));
return;
}
this.dagActionStore.addDagAction(flowGroup, flowName,
flowExecutionId.toString(), DagActionStore.DagActionValue.RESUME);
- } catch (IOException | SQLException | SpecNotFoundException e) {
+ } catch (IOException | SQLException e) {
log.warn(
String.format("Failed to add execution resume action for flow %s %s
%s to dag action store due to", flowGroup,
flowName, flowExecutionId), e);
- this.handleException(flowGroup, flowName, flowExecutionId.toString(), e);
+ this.handleException(flowGroup, flowName, flowExecutionId.toString(),
DagActionStore.DagActionValue.RESUME, e);
}
}
- private void handleException (String flowGroup, String flowName, String
flowExecutionId, Exception e) {
+ private void handleException (String flowGroup, String flowName, String
flowExecutionId, DagActionStore.DagActionValue dagActionValue, Exception e) {
Review Comment:
I'm not clear whether I've missed something, but is `Exception e` used
merely to `e.getMessage()`? if so, why are we passing in an exception, which
generates a stacktrace, etc., rather than merely passing in a `String
errMessage`?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -119,28 +158,8 @@ protected void processMessage(DecodeableKafkaRecord
message) {
return;
}
- // Retrieve the Dag Action taken from MySQL table unless operation is
DELETE
- DagActionStore.DagActionValue dagAction = null;
- if (!operation.equals("DELETE")) {
- try {
- dagAction = dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId).getDagActionValue();
- } catch (IOException e) {
- log.error("Encountered IOException trying to retrieve dagAction for
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup,
flowName, flowExecutionId, e);
- this.unexpectedErrors.mark();
- return;
- } catch (SpecNotFoundException e) {
- log.error("DagAction not found for flow group: {} name: {}
executionId: {} Exception: {}", flowGroup, flowName,
- flowExecutionId, e);
- this.unexpectedErrors.mark();
- return;
- } catch (SQLException throwables) {
- log.error("Encountered SQLException trying to retrieve dagAction for
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup,
flowName, flowExecutionId, throwables);
- return;
- }
- }
-
- // We only expert INSERT and DELETE operations done to this table. INSERTs
correspond to resume or delete flow
- // requests that have to be processed. DELETEs require no action.
+ // We only expect INSERT and DELETE operations done to this table. INSERTs
correspond to any type of
+ // {@link DagActionStore.DagACtionValue} flow requests that have to be
processed. DELETEs require no action.
Review Comment:
DagA(c)tionValue
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -151,7 +170,16 @@ protected void processMessage(DecodeableKafkaRecord
message) {
log.info("Received insert dag action and about to send kill flow
request");
dagManager.handleKillFlowRequest(flowGroup, flowName,
Long.parseLong(flowExecutionId));
this.killsInvoked.mark();
- } else {
+ } else if (dagAction.equals(DagActionStore.DagActionValue.LAUNCH)) {
+ // If multi-active scheduler is NOT turned on we should not receive
these type of events
+ if (!this.isMultiActiveSchedulerEnabled) {
+ log.warn("Received LAUNCH dagAction while not in multi-active
scheduler mode for flow group: {}, flow name:"
+ + "{}, execution id: {}, dagAction: {}", flowGroup, flowName,
flowExecutionId, dagAction);
+ this.unexpectedErrors.mark();
+ }
+ log.info("Received insert dag action and about to forward launch
request to DagManager");
+ submitFlowToDagManager(flowGroup, flowName);
+ }else {
log.warn("Received unsupported dagAction {}. Expected to be a KILL
or RESUME", dagAction);
Review Comment:
...or `LAUNCH`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlSchedulerLeaseDeterminationStore.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class MysqlSchedulerLeaseDeterminationStore implements
SchedulerLeaseDeterminationStore {
+ public static final String CONFIG_PREFIX =
"MysqlSchedulerLeaseDeterminationStore";
+
+ protected final DataSource dataSource;
+ private final DagActionStore dagActionStore;
+ private final String tableName;
+ private final long epsilon;
+ private final long linger;
+ /* TODO:
+ - define retention on this table
+ - initialize table with epsilon and linger if one already doesn't exist
using these configs
+ - join with table above to ensure epsilon/linger values are consistent
across hosts (in case hosts are deployed with different configs)
+ */
+ protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE
flow_group=? AND flow_name=? AND flow_execution_id=? "
+ + "AND flow_action=? AND ABS(trigger_event_timestamp-?) <= %s";
+ protected static final String
ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT = "INSERT INTO %s
(flow_group, "
+ + "flow_name, flow_execution_id, flow_action, trigger_event_timestamp)
VALUES (?, ?, ?, ?, ?) WHERE NOT EXISTS ("
+ + "SELECT * FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW + "; SELECT
ROW_COUNT() AS rows_inserted_count, "
+ + "pursuant_timestamp FROM %s " + WHERE_CLAUSE_TO_MATCH_ROW;
+
+ protected static final String UPDATE_PURSUANT_TIMESTAMP_STATEMENT = "UPDATE
%s SET pursuant_timestamp = NULL "
+ + WHERE_CLAUSE_TO_MATCH_ROW;
+ private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT
EXISTS %S (" + "flow_group varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, flow_name
varchar("
+ + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " +
"flow_execution_id varchar("
+ + ServiceConfigKeys.MAX_FLOW_EXECUTION_ID_LENGTH + ") NOT NULL,
flow_action varchar(100) NOT NULL, "
+ + "trigger_event_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, "
+ + "pursuant_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,"
+ + "PRIMARY KEY
(flow_group,flow_name,flow_execution_id,flow_action,trigger_event_timestamp)";
+
+ @Inject
+ public MysqlSchedulerLeaseDeterminationStore(Config config, DagActionStore
dagActionStore) throws IOException {
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ } else {
+ throw new IOException("Please specify the config for
MysqlSchedulerLeaseDeterminationStore");
+ }
+
+ this.tableName = ConfigUtils.getString(config,
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY,
+
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
+ this.epsilon = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS);
+ this.linger = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS);
+
+ this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement createStatement =
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) {
+ createStatement.executeUpdate();
+ connection.commit();
+ } catch (SQLException e) {
+ throw new IOException("Table creation failure for " + tableName, e);
+ }
+ this.dagActionStore = dagActionStore;
+ }
+
+ @Override
+ public LeaseAttemptStatus attemptInsertAndGetPursuantTimestamp(String
flowGroup, String flowName,
+ String flowExecutionId, FlowActionType flowActionType, long
triggerTimeMillis)
+ throws IOException {
+ Timestamp triggerTimestamp = new Timestamp(triggerTimeMillis);
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement insertStatement = connection.prepareStatement(
+ String.format(ATTEMPT_INSERT_AND_GET_PURSUANT_TIMESTAMP_STATEMENT,
tableName, tableName, epsilon, tableName,
+ epsilon))) {
+ int i = 0;
+ // Values to set in new row
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ // Values to check if existing row matches
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ // Values to make select statement to read row
+ insertStatement.setString(++i, flowGroup);
+ insertStatement.setString(++i, flowName);
+ insertStatement.setString(++i, flowExecutionId);
+ insertStatement.setString(++i, flowActionType.toString());
+ insertStatement.setTimestamp(++i, triggerTimestamp);
+ ResultSet resultSet = insertStatement.executeQuery();
+ connection.commit();
+
+ if (!resultSet.next()) {
+ resultSet.close();
+ throw new IOException(String.format("Unexpected error where no result
returned while trying to obtain lease. "
+ + "This error indicates that no entry existed for trigger flow
event for table %s flow group: %s, flow "
+ + "name: %s flow execution id: %s and trigger timestamp: %s
when one should have been inserted",
+ tableName, flowGroup, flowName, flowExecutionId,
triggerTimestamp));
+ }
+ // If a row was inserted, then we have obtained the lease
+ int rowsUpdated = resultSet.getInt(1);
+ if (rowsUpdated == 1) {
+ // If the pursuing flow launch has been persisted to the {@link
DagActionStore} we have completed lease obtainment
+ this.dagActionStore.addDagAction(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionValue.LAUNCH);
+ if (this.dagActionStore.exists(flowGroup, flowName, flowExecutionId,
DagActionStore.DagActionValue.LAUNCH)) {
+ if (updatePursuantTimestamp(flowGroup, flowName, flowExecutionId,
flowActionType, triggerTimestamp)) {
+ // TODO: potentially add metric here to count number of flows
scheduled by each scheduler
+ LOG.info("Host completed obtaining lease for flow group: %s, flow
name: %s flow execution id: %s and "
+ + "trigger timestamp: %s", flowGroup, flowName,
flowExecutionId, triggerTimestamp);
+ resultSet.close();
+ return LeaseAttemptStatus.LEASE_OBTAINED;
+ } else {
+ LOG.warn("Unable to update pursuant timestamp after persisting
flow launch to DagActionStore for flow "
+ + "group: %s, flow name: %s flow execution id: %s and trigger
timestamp: %s.", flowGroup, flowName,
+ flowExecutionId, triggerTimestamp);
+ }
+ } else {
+ LOG.warn("Did not find flow launch action in DagActionStore after
adding it for flow group: %s, flow name: "
+ + "%s flow execution id: %s and trigger timestamp: %s.",
flowGroup, flowName, flowExecutionId,
+ triggerTimestamp);
+ }
+ } else if (rowsUpdated > 1) {
+ resultSet.close();
+ throw new IOException(String.format("Expect at most 1 row in table for
a given trigger event. %s rows "
+ + "exist for the trigger flow event for table %s flow group: %s,
flow name: %s flow execution id: %s "
+ + "and trigger timestamp: %s.", i, tableName, flowGroup, flowName,
flowExecutionId, triggerTimestamp));
+ }
+ Timestamp pursuantTimestamp = resultSet.getTimestamp(2);
+ resultSet.close();
+ long currentTimeMillis = System.currentTimeMillis();
+ // Another host has obtained lease and no further steps required
+ if (pursuantTimestamp == null) {
+ LOG.info("Another host has already successfully obtained lease for
flow group: %s, flow name: %s flow execution "
+ + "id: %s and trigger timestamp: %s", flowGroup, flowName,
flowExecutionId, triggerTimeMillis);
+ return LeaseAttemptStatus.LEASE_OBTAINED;
+ } else if (pursuantTimestamp.getTime() + linger <= currentTimeMillis) {
+ return LeaseAttemptStatus.PREVIOUS_LEASE_EXPIRED;
+ }
+ // Previous lease owner still has valid lease (pursuant + linger >
current timestamp)
+ return LeaseAttemptStatus.PREVIOUS_LEASE_VALID;
+ } catch (SQLException e) {
+ throw new IOException(String.format("Error encountered while trying to
obtain lease on trigger flow event for "
+ + "table %s flow group: %s, flow name: %s flow execution id: %s
and trigger timestamp: %s", tableName,
+ flowGroup, flowName, flowExecutionId, triggerTimestamp), e);
+ }
+ }
+
+ @Override
+ public boolean updatePursuantTimestamp(String flowGroup, String flowName,
String flowExecutionId,
+ FlowActionType flowActionType, Timestamp triggerTimestamp)
+ throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement updateStatement = connection.prepareStatement(
Review Comment:
perhaps consider reining in boiler plate w/ something like
`getPreparedStatement` -
https://github.com/apache/gobblin/blob/51a852d506b749b9ac33568aff47105e14972a57/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java#L356
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.SchedulerLeaseDeterminationStore;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class SchedulerLeaseAlgoHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
+ private final long linger;
+ private final int staggerUpperBoundSec;
+ private static Random random = new Random();
+ protected SchedulerLeaseDeterminationStore leaseDeterminationStore;
+ protected JobScheduler jobScheduler;
+ protected SchedulerService schedulerService;
+ @Inject
+ public SchedulerLeaseAlgoHandler(Config config,
SchedulerLeaseDeterminationStore leaseDeterminationStore,
+ JobScheduler jobScheduler, SchedulerService schedulerService)
+ throws IOException {
+ this.linger = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS);
+ this.staggerUpperBoundSec = ConfigUtils.getInt(config,
+ ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+ this.leaseDeterminationStore = leaseDeterminationStore;
+ this.jobScheduler = jobScheduler;
+ this.schedulerService = schedulerService;
+ }
+ private SchedulerLeaseDeterminationStore schedulerLeaseDeterminationStore;
+
+ /**
+ * This method is used in the multi-active scheduler case for one or more
hosts to respond to a flow's trigger event
+ * by attempting a lease for the flow event.
+ * @param jobProps
+ * @param flowGroup
+ * @param flowName
+ * @param flowExecutionId
+ * @param flowActionType
+ * @param triggerTimeMillis
+ * @return true if this host obtained the lease for this flow's trigger
event, false otherwise.
+ * @throws IOException
+ */
+ public boolean handleNewTriggerEvent(Properties jobProps, String flowGroup,
String flowName, String flowExecutionId,
+ SchedulerLeaseDeterminationStore.FlowActionType flowActionType, long
triggerTimeMillis)
+ throws IOException {
+ SchedulerLeaseDeterminationStore.LeaseAttemptStatus leaseAttemptStatus =
+
schedulerLeaseDeterminationStore.attemptInsertAndGetPursuantTimestamp(flowGroup,
flowName, flowExecutionId,
+ flowActionType, triggerTimeMillis);
+ // TODO: add a log event or metric for each of these cases
+ switch (leaseAttemptStatus) {
+ case LEASE_OBTAINED:
+ return true;
+ case PREVIOUS_LEASE_EXPIRED:
+ // recursively try obtaining lease again immediately, stops when
reaches one of the other cases
+ return handleNewTriggerEvent(jobProps, flowGroup, flowName,
flowExecutionId, flowActionType, triggerTimeMillis);
Review Comment:
see comment elsewhere... this should not happen
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.SchedulerLeaseDeterminationStore;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class SchedulerLeaseAlgoHandler {
Review Comment:
all classes deserve javadoc
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java:
##########
@@ -151,7 +170,16 @@ protected void processMessage(DecodeableKafkaRecord
message) {
log.info("Received insert dag action and about to send kill flow
request");
dagManager.handleKillFlowRequest(flowGroup, flowName,
Long.parseLong(flowExecutionId));
this.killsInvoked.mark();
- } else {
+ } else if (dagAction.equals(DagActionStore.DagActionValue.LAUNCH)) {
+ // If multi-active scheduler is NOT turned on we should not receive
these type of events
+ if (!this.isMultiActiveSchedulerEnabled) {
+ log.warn("Received LAUNCH dagAction while not in multi-active
scheduler mode for flow group: {}, flow name:"
+ + "{}, execution id: {}, dagAction: {}", flowGroup, flowName,
flowExecutionId, dagAction);
+ this.unexpectedErrors.mark();
+ }
+ log.info("Received insert dag action and about to forward launch
request to DagManager");
+ submitFlowToDagManager(flowGroup, flowName);
Review Comment:
so even if not enabled for such behavior, we still proceed to submit this
flow?
Issue Time Tracking
-------------------
Worklog Id: (was: 862947)
Time Spent: 5h (was: 4h 50m)
> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
> Key: GOBBLIN-1837
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 5h
> Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active
> scheduler for each host. It will NOT include metric emission or unit tests
> for validation. That will be done in a separate follow-up ticket. The work in
> this ticket includes
> * define a table to do scheduler lease determination for each flow's trigger
> event and related methods to execute actions on this table
> * update DagActionStore schema and DagActionStoreMonitor to act upon new
> "LAUNCH" type events in addition to KILL/RESUME
> * update scheduler/orchestrator logic to apply the non-blocking algorithm
> when "multi-active scheduler mode" is enabled, otherwise submit events
> directly to the DagManager after receiving a scheduler trigger
> * implement the non-blocking algorithm, particularly handling reminder
> events if another host is in the process of securing the lease for a
> particular flow trigger
--
This message was sent by Atlassian Jira
(v8.20.10#820010)