This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ae7c0bf1 [GOBBLIN-1923] Add retention for lease arbiter table (#3792)
7ae7c0bf1 is described below

commit 7ae7c0bf149061ad7a762f21355091c9dc1aae64
Author: umustafi <[email protected]>
AuthorDate: Fri Oct 6 13:22:46 2023 -0700

    [GOBBLIN-1923] Add retention for lease arbiter table (#3792)
    
    * Add retention for lease arbiter table
    
    * Replace blocking thread with scheduled thread pool executor
    
    * Make Calendar instance thread-safe
    
    * Rename variables, make values more clear
    
    * Update timestamp related cols
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../gobblin/configuration/ConfigurationKeys.java   |  4 +-
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java  | 92 ++++++++++++++++------
 .../dag_action_store/MysqlDagActionStore.java      |  1 -
 3 files changed, 70 insertions(+), 27 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 81608655a..6d36d9ff3 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -101,6 +101,8 @@ public class ConfigurationKeys {
   public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "gobblin_multi_active_scheduler_constants_store";
   public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiter.store.db.table";
   public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"gobblin_scheduler_lease_determination_store";
+  public static final String 
SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".retentionPeriodMillis";
+  public static final long 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS = 3 * 24 * 
60 * 60 * 1000; // (3 days in ms)
   // Refers to the event we originally tried to acquire a lease which achieved 
`consensus` among participants through
   // the database
   public static final String 
SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY = 
"preservedConsensusEventTimeMillis";
@@ -116,7 +118,7 @@ public class ConfigurationKeys {
   public static final String SCHEDULER_EVENT_LINGER_MILLIS_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".lingerMillis";
   public static final int DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS = 30000;
   public static final String SCHEDULER_MAX_BACKOFF_MILLIS_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".maxBackoffMillis";
-  public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 5000;
+  public static final int DEFAULT_SCHEDULER_MAX_BACKOFF_MILLIS = 10000;
 
   // Job executor thread pool size
   public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = 
"jobexecutor.threadpool.size";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index a7c035185..4b4b9c837 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -30,6 +30,8 @@ import java.sql.Timestamp;
 import java.util.Calendar;
 import java.util.Optional;
 import java.util.TimeZone;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import javax.sql.DataSource;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -85,15 +87,16 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   protected final DataSource dataSource;
   private final String leaseArbiterTableName;
   private final String constantsTableName;
-  private final int epsilon;
-  private final int linger;
+  private final int epsilonMillis;
+  private final int lingerMillis;
+  private final long retentionPeriodMillis;
+  private String thisTableRetentionStatement;
   private String thisTableGetInfoStatement;
   private String thisTableGetInfoStatementForReminder;
   private String thisTableSelectAfterInsertStatement;
   private String thisTableAcquireLeaseIfMatchingAllStatement;
   private String thisTableAcquireLeaseIfFinishedStatement;
 
-  // TODO: define retention on this table
   /*
     Notes:
     - Set `event_timestamp` default value to turn off timestamp auto-updates 
for row modifications which alters this col
@@ -110,9 +113,13 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE 
TABLE IF NOT EXISTS %s ("
       + "flow_group varchar(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") 
NOT NULL, flow_name varchar("
       + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + ") NOT NULL, " + " 
flow_action varchar(100) NOT NULL, "
-      + "event_timestamp TIMESTAMP(3) DEFAULT CURRENT_TIMESTAMP(3), "
-      + "lease_acquisition_timestamp TIMESTAMP(3) NULL DEFAULT NULL, "
+      + "event_timestamp TIMESTAMP(3) NOT NULL, "
+      + "lease_acquisition_timestamp TIMESTAMP(3) NULL, "
       + "PRIMARY KEY (flow_group,flow_name,flow_action))";
+  // Deletes rows older than retention time period regardless of lease status 
as they should all be invalid or completed
+  // since retention >> linger
+  private static final String LEASE_ARBITER_TABLE_RETENTION_STATEMENT = 
"DELETE FROM %s WHERE event_timestamp < "
+      + "DATE_SUB(CURRENT_TIMESTAMP(3), INTERVAL ? * 1000 MICROSECOND)";
   private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE 
IF NOT EXISTS %s "
       + "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY 
(primary_key))";
   // Only insert epsilon and linger values from config if this table does not 
contain a pre-existing values already.
@@ -137,7 +144,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 1 "
       + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 2 "
       + "ELSE 3 END as lease_validity_status, linger, "
-      + "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
+      + "CONVERT_TZ(CURRENT_TIMESTAMP(3), @@session.time_zone, '+00:00') as 
utc_current_timestamp FROM %s, %s "
+      + WHERE_CLAUSE_TO_MATCH_KEY;
   // Same as query above, except that isWithinEpsilon is True if the reminder 
event timestamp (provided by caller) is
   // OLDER than or equal to the db event_timestamp and within epsilon away 
from it.
   protected static final String GET_EVENT_INFO_STATEMENT_FOR_REMINDER = 
"SELECT "
@@ -147,7 +155,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       + "WHEN CURRENT_TIMESTAMP(3) < DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 1 "
       + "WHEN CURRENT_TIMESTAMP(3) >= DATE_ADD(lease_acquisition_timestamp, 
INTERVAL linger*1000 MICROSECOND) then 2 "
       + "ELSE 3 END as lease_validity_status, linger, "
-      + "UTC_TIMESTAMP(3) as utc_current_timestamp FROM %s, %s " + 
WHERE_CLAUSE_TO_MATCH_KEY;
+      + "CONVERT_TZ(CURRENT_TIMESTAMP(3), @@session.time_zone, '+00:00') as 
utc_current_timestamp FROM %s, %s "
+      + WHERE_CLAUSE_TO_MATCH_KEY;
   // Insert or update row to acquire lease if values have not changed since 
the previous read
   // Need to define three separate statements to handle cases where row does 
not exist or has null values to check
   protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT 
INTO %s (flow_group, flow_name, "
@@ -162,8 +171,9 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       + WHERE_CLAUSE_TO_MATCH_ROW;
   // Complete lease acquisition if values have not changed since lease was 
acquired
   protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT = 
"UPDATE %s SET "
-      + "lease_acquisition_timestamp = NULL " + WHERE_CLAUSE_TO_MATCH_ROW;
-  protected static final Calendar UTC_CAL = 
Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+      + "event_timestamp=event_timestamp, lease_acquisition_timestamp = NULL " 
+ WHERE_CLAUSE_TO_MATCH_ROW;
+  private static final ThreadLocal<Calendar> UTC_CAL =
+      ThreadLocal.withInitial(() -> 
Calendar.getInstance(TimeZone.getTimeZone("UTC")));
 
   @Inject
   public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
@@ -178,10 +188,13 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE);
     this.constantsTableName = ConfigUtils.getString(config, 
ConfigurationKeys.MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY,
         ConfigurationKeys.DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE);
-    this.epsilon = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
+    this.epsilonMillis = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_EPSILON_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS);
-    this.linger = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
+    this.lingerMillis = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
+    this.retentionPeriodMillis = ConfigUtils.getLong(config, 
ConfigurationKeys.SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS_KEY,
+        
ConfigurationKeys.DEFAULT_SCHEDULER_LEASE_DETERMINATION_TABLE_RETENTION_PERIOD_MILLIS);
+    this.thisTableRetentionStatement = 
String.format(LEASE_ARBITER_TABLE_RETENTION_STATEMENT, 
this.leaseArbiterTableName);
     this.thisTableGetInfoStatement = String.format(GET_EVENT_INFO_STATEMENT, 
this.leaseArbiterTableName,
         this.constantsTableName);
     this.thisTableGetInfoStatementForReminder = 
String.format(GET_EVENT_INFO_STATEMENT_FOR_REMINDER,
@@ -203,7 +216,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
     }
     initializeConstantsTable();
-
+    runRetentionOnArbitrationTable();
     log.info("MysqlMultiActiveLeaseArbiter initialized");
   }
 
@@ -215,12 +228,41 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     String insertConstantsStatement = 
String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
     withPreparedStatement(insertConstantsStatement, insertStatement -> {
       int i = 0;
-      insertStatement.setInt(++i, epsilon);
-      insertStatement.setInt(++i, linger);
+      insertStatement.setInt(++i, epsilonMillis);
+      insertStatement.setInt(++i, lingerMillis);
       return insertStatement.executeUpdate();
     }, true);
   }
 
+  /**
+   * Periodically deletes all rows in the table with event_timestamp older 
than the retention period defined by config.
+   * // TODO: create a utility to run a SQL commend in a STPE using interval T
+   */
+  private void runRetentionOnArbitrationTable() {
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+    Runnable retentionTask = () -> {
+      try {
+        Thread.sleep(10000);
+        withPreparedStatement(thisTableRetentionStatement,
+            retentionStatement -> {
+              retentionStatement.setLong(1, retentionPeriodMillis);
+              int numRowsDeleted = retentionStatement.executeUpdate();
+              if (numRowsDeleted != 0) {
+                log.info("Multi-active lease arbiter retention thread deleted 
{} rows from the lease arbiter table",
+                    numRowsDeleted);
+              }
+              return numRowsDeleted;
+            }, true);
+      } catch (InterruptedException | IOException e) {
+        log.error("Failing to run retention on lease arbiter table. Unbounded 
growth can lead to database slowness and "
+            + "affect our system performance. Examine exception: ", e);
+      }
+    };
+
+    // Run retention thread every 4 hours (6 times a day)
+    executor.scheduleAtFixedRate(retentionTask, 0, 4, TimeUnit.HOURS);
+  }
+
   @Override
   public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis,
       boolean isReminderEvent) throws IOException {
@@ -340,12 +382,12 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws 
IOException {
     try {
       // Extract values from result set
-      Timestamp dbEventTimestamp = 
resultSet.getTimestamp("utc_event_timestamp", UTC_CAL);
-      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL);
+      Timestamp dbEventTimestamp = 
resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get());
+      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL.get());
       boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
       int leaseValidityStatus = resultSet.getInt("lease_validity_status");
       int dbLinger = resultSet.getInt("linger");
-      Timestamp dbCurrentTimestamp = 
resultSet.getTimestamp("utc_current_timestamp", UTC_CAL);
+      Timestamp dbCurrentTimestamp = 
resultSet.getTimestamp("utc_current_timestamp", UTC_CAL.get());
       return new GetEventInfoResult(dbEventTimestamp, 
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
           dbLinger, dbCurrentTimestamp);
     } catch (SQLException e) {
@@ -423,14 +465,14 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           throw new IOException("Expected resultSet containing row information 
for the lease that was attempted but "
               + "received nothing.");
         }
-        if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL) == null) {
+        if (resultSet.getTimestamp("utc_event_timestamp", UTC_CAL.get()) == 
null) {
           throw new IOException("event_timestamp should never be null (it is 
always set to current timestamp)");
         }
-        long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp", 
UTC_CAL).getTime();
+        long eventTimeMillis = resultSet.getTimestamp("utc_event_timestamp", 
UTC_CAL.get()).getTime();
         // Lease acquisition timestamp is null if another participant has 
completed the lease
         Optional<Long> leaseAcquisitionTimeMillis =
-            resultSet.getTimestamp("utc_lease_acquisition_timestamp", UTC_CAL) 
== null ? Optional.empty() :
-            
Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp", 
UTC_CAL).getTime());
+            resultSet.getTimestamp("utc_lease_acquisition_timestamp", 
UTC_CAL.get()) == null ? Optional.empty() :
+            
Optional.of(resultSet.getTimestamp("utc_lease_acquisition_timestamp", 
UTC_CAL.get()).getTime());
         int dbLinger = resultSet.getInt("linger");
         return new SelectInfoResult(eventTimeMillis, 
leaseAcquisitionTimeMillis, dbLinger);
       } catch (SQLException e) {
@@ -526,10 +568,10 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     statement.setString(++i, flowAction.getFlowActionType().toString());
     // Values that may be needed depending on the insert statement
     if (needEventTimeCheck) {
-      statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL);
+      statement.setTimestamp(++i, originalEventTimestamp, UTC_CAL.get());
     }
     if (needLeaseAcquisitionTimeCheck) {
-      statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp, UTC_CAL);
+      statement.setTimestamp(++i, originalLeaseAcquisitionTimestamp, 
UTC_CAL.get());
     }
   }
 
@@ -546,8 +588,8 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           updateStatement.setString(++i, flowGroup);
           updateStatement.setString(++i, flowName);
           updateStatement.setString(++i, flowActionType.toString());
-          updateStatement.setTimestamp(++i, new 
Timestamp(status.getEventTimestamp()), UTC_CAL);
-          updateStatement.setTimestamp(++i, new 
Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL);
+          updateStatement.setTimestamp(++i, new 
Timestamp(status.getEventTimestamp()), UTC_CAL.get());
+          updateStatement.setTimestamp(++i, new 
Timestamp(status.getLeaseAcquisitionTimestamp()), UTC_CAL.get());
           int numRowsUpdated = updateStatement.executeUpdate();
           if (numRowsUpdated == 0) {
             log.info("Multi-active lease arbiter lease attempt: [{}, 
eventTimestamp: {}] - FAILED to complete because "
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
index ab5faee8c..4f639e04a 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/dag_action_store/MysqlDagActionStore.java
@@ -173,7 +173,6 @@ public class MysqlDagActionStore implements DagActionStore {
         rs.close();
       }
     }
-
   }
 
   @Override

Reply via email to