phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1222573977


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MySQLMultiActiveLeaseArbiter.java:
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import com.zaxxer.hikari.HikariDataSource;
+
+import javax.sql.DataSource;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * MySQL based implementation of the {@link MultiActiveLeaseArbiter} which 
uses a MySQL store to resolve ownership of
+ * a flow event amongst multiple competing instances. A MySQL table is used to 
store flow identifying information as
+ * well as the flow action associated with it. It uses two additional values 
of the `event_timestamp` and
+ * `lease_acquisition_timestamp` to indicate an active lease, expired lease, 
and state of no longer leasing. The table
+ * schema is as follows:
+ * [flow_group | flow_name | flow_execution_id | flow_action | event_timestamp 
| lease_acquisition_timestamp]
+ * (----------------------primary key------------------------)
+ * We also maintain another table in the database with two constants that 
allow us to coordinate between instances and
+ * ensure they are using the same values to base their coordination off of.
+ * [epsilon | linger]
+ * `epsilon` - time within we consider to timestamps to be the same, to 
account for between-host clock drift
+ * `linger` - minimum time to occur before another host may attempt a lease on 
a flow event. It should be much greater
+ *            than epsilon and encapsulate executor communication latency 
including retry attempts
+ *
+ * The `event_timestamp` is the time of the flow_action event request.
+ * ---Event consolidation---
+ * Note that for the sake of simplification, we only allow one event 
associated with a particular flow's flow_action
+ * (ie: only one LAUNCH for example of flow FOO, but there can be a LAUNCH, 
KILL, & RESUME for flow FOO at once) during
+ * the time it takes to execute the flow action. In most cases, the execution 
time should be so negligible that this
+ * event consolidation of duplicate flow action requests is not noticed and 
even during executor downtime this behavior
+ * is acceptable as the user generally expects a timely execution of the most 
recent request rather than one execution
+ * per request.
+ *
+ * The `lease_acquisition_timestamp` is the time a host acquired ownership of 
this flow action, and it is valid for
+ * `linger` period of time after which it expires and any host can re-attempt 
ownership. In most cases, the original
+ * host should actually complete its work while having the lease and then mark 
the flow action as NULL to indicate no
+ * further leasing should be done for the event.
+ */
+public class MySQLMultiActiveLeaseArbiter implements MultiActiveLeaseArbiter {
+  /** `j.u.Function` variant for an operation that may @throw IOException or 
SQLException: preserves method signature checked exceptions */
+  @FunctionalInterface
+  protected interface CheckedFunction<T, R> {
+    R apply(T t) throws IOException, SQLException;
+  }
+
+  public static final String CONFIG_PREFIX = "MySQLMultiActiveLeaseArbiter";
+
+  protected final DataSource dataSource;
+  private final String leaseArbiterTableName;
+  private final String constantsTableName;
+  private final int epsilon;
+  private final int linger;
+  protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE 
flow_group=? AND flow_name=? AND flow_execution_id=?"
+      + " AND flow_action=?";
+  protected static final String WHERE_CLAUSE_TO_MATCH_ROW = "WHERE 
flow_group=? AND flow_name=? AND flow_execution_id=?"
+      + " AND flow_action=? AND event_timestamp=? AND 
lease_acquisition_timestamp=?";
+
+  protected static final String SELECT_AFTER_INSERT_STATEMENT = "SELECT 
ROW_COUNT() AS rows_inserted_count, "
+      + "lease_acquisition_timestamp, linger FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
+
+  // Does a cross join between the two tables to have epsilon and linger 
values available. Returns the following values:
+  // event_timestamp, lease_acquisition_timestamp, isWithinEpsilon (boolean if 
event_timestamp in table is within
+  // epsilon), leaseValidityStatus (1 if lease has not expired, 2 if expired, 
3 if column is NULL or no longer leasing)
+  protected static final String GET_EVENT_INFO_STATEMENT = "SELECT 
event_timestamp, lease_acquisition_timestamp, "
+      + "abs(event_timestamp - ?) <= epsilon as isWithinEpsilon, CASE "
+      + "WHEN CURRENT_TIMESTAMP < (lease_acquisition_timestamp + linger) then 
1"
+      + "WHEN CURRENT_TIMESTAMP >= (lease_acquisition_timestamp + linger) then 
2"
+      + "ELSE 3 END as leaseValidityStatus, linger FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;

Review Comment:
   what about:
   ```
   (lease_acquisition_timestamp + linger) < CURRENT_TIMESTAMP as isLeaseExpired
   ```
   then you either have:
   1 (TRUE) - expired
   0 (FALSE) - not expired
   NULL - no lease
   
   (IMO easier to follow than SQL `CASE`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to