umustafi commented on code in PR #3715:
URL: https://github.com/apache/gobblin/pull/3715#discussion_r1261809318


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -137,79 +146,98 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws 
IOException {
     this.linger = ConfigUtils.getInt(config, 
ConfigurationKeys.SCHEDULER_EVENT_LINGER_MILLIS_KEY,
         ConfigurationKeys.DEFAULT_SCHEDULER_EVENT_LINGER_MILLIS);
     this.dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
+    String createArbiterStatement = String.format(
+        CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
     try (Connection connection = dataSource.getConnection();
-        PreparedStatement createStatement = 
connection.prepareStatement(String.format(
-            CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName))) {
+        PreparedStatement createStatement = 
connection.prepareStatement(createArbiterStatement)) {
       createStatement.executeUpdate();
       connection.commit();
     } catch (SQLException e) {
       throw new IOException("Table creation failure for " + 
leaseArbiterTableName, e);
     }
-    withPreparedStatement(String.format(CREATE_CONSTANTS_TABLE_STATEMENT, 
this.constantsTableName, this.constantsTableName),
-        createStatement -> {
-      int i = 0;
-      createStatement.setInt(++i, epsilon);
-      createStatement.setInt(++i, linger);
-      return createStatement.executeUpdate();}, true);
+    String createConstantsStatement = 
String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+    withPreparedStatement(createConstantsStatement, createStatement -> 
createStatement.executeUpdate(), true);
+
+    int count = withPreparedStatement(String.format(GET_ROW_COUNT_STATEMENT, 
this.constantsTableName), getStatement -> {
+      ResultSet resultSet = getStatement.executeQuery();
+      if (resultSet.next()) {
+        return resultSet.getInt(1);
+      }
+      return -1;
+    }, true);
+
+    // Only insert epsilon and linger values from config if this table does 
not contain pre-existing values.
+    if (count == 0) {
+      String insertConstantsStatement = 
String.format(INSERT_IN_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
+      withPreparedStatement(insertConstantsStatement, insertStatement -> {
+        int i = 0;
+        insertStatement.setInt(++i, epsilon);
+        insertStatement.setInt(++i, linger);
+        return insertStatement.executeUpdate();
+      }, true);
+    }
+
+    log.info("MysqlMultiActiveLeaseArbiter initialized");
   }
 
   @Override
   public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction 
flowAction, long eventTimeMillis)
       throws IOException {
     // Check table for an existing entry for this flow action and event time
-    ResultSet resultSet = withPreparedStatement(
+    GetEventInfoResult getResult = withPreparedStatement(
         String.format(GET_EVENT_INFO_STATEMENT, this.leaseArbiterTableName, 
this.constantsTableName),
         getInfoStatement -> {
           int i = 0;
-          getInfoStatement.setTimestamp(++i, new Timestamp(eventTimeMillis));
           getInfoStatement.setString(++i, flowAction.getFlowGroup());
           getInfoStatement.setString(++i, flowAction.getFlowName());
           getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
           getInfoStatement.setString(++i, 
flowAction.getFlowActionType().toString());
-          return getInfoStatement.executeQuery();
+          ResultSet resultSet = getInfoStatement.executeQuery();
+          if (!resultSet.next()) {
+            return null;
+          }
+          return new GetEventInfoResult(resultSet);
         }, true);
 
-    String formattedSelectAfterInsertStatement =
-        String.format(SELECT_AFTER_INSERT_STATEMENT, 
this.leaseArbiterTableName, this.constantsTableName);
     try {
       // CASE 1: If no existing row for this flow action, then go ahead and 
insert
-      if (!resultSet.next()) {
+      if (getResult == null) {
+        log.debug("CASE 1: no existing row for this flow action, then go ahead 
and insert");

Review Comment:
   Let me move these all to debug level logs instead of comments. I am 
numbering these cases here explicitly actually and referencing these numbers in 
the unit tests. They are not defined elsewhere. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to