This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d600b52dd [GOBBLIN-1907] Handle Lease Completion From Other 
Multi-active Participants (#3771)
d600b52dd is described below

commit d600b52dd20c21cf26203d0852613db655fb7e96
Author: umustafi <[email protected]>
AuthorDate: Wed Sep 13 17:12:29 2023 -0700

    [GOBBLIN-1907] Handle Lease Completion From Other Multi-active Participants 
(#3771)
    
    * Handle Lease Completion From Other Multi-active Participants
    
    * Use optional instead of sentinel value when lease acquisition timestamp 
null
    
    * Use Java Optional instead of Guava
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java  | 27 ++++++++++++++--------
 .../api/MysqlMultiActiveLeaseArbiterTest.java      | 13 +++++++----
 .../scheduler/GobblinServiceJobScheduler.java      |  4 +++-
 3 files changed, 30 insertions(+), 14 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 33ab56189..6352a1ff1 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.runtime.api;
 
-import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.typesafe.config.Config;
 import com.zaxxer.hikari.HikariDataSource;
@@ -28,6 +27,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLIntegrityConstraintViolationException;
 import java.sql.Timestamp;
+import java.util.Optional;
 import javax.sql.DataSource;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -200,7 +200,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no 
existing row for this flow action, then go"
                 + " ahead and insert", flowAction, eventTimeMillis);
         int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
-       return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.absent());
+       return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.empty());
       }
 
       // Extract values from result set
@@ -273,7 +273,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           ResultSet resultSet = getInfoStatement.executeQuery();
           try {
             if (!resultSet.next()) {
-              return Optional.<GetEventInfoResult>absent();
+              return Optional.<GetEventInfoResult>empty();
             }
             return Optional.of(createGetInfoResult(resultSet));
           } finally {
@@ -367,11 +367,16 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   protected static SelectInfoResult createSelectInfoResult(ResultSet 
resultSet) throws IOException {
       try {
         if (!resultSet.next()) {
-          throw new IOException("Expected num rows and 
lease_acquisition_timestamp returned from query but received nothing, so "
-              + "providing empty result to lease evaluation code");
+          throw new IOException("Expected resultSet containing row information 
for the lease that was attempted but "
+              + "received nothing.");
+        }
+        if (resultSet.getTimestamp(1) == null) {
+          throw new IOException("event_timestamp should never be null (it is 
always set to current timestamp)");
         }
         long eventTimeMillis = resultSet.getTimestamp(1).getTime();
-        long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
+        // Lease acquisition timestamp is null if another participant has 
completed the lease
+        Optional<Long> leaseAcquisitionTimeMillis = resultSet.getTimestamp(2) 
== null ? Optional.empty() :
+            Optional.of(resultSet.getTimestamp(2).getTime());
         int dbLinger = resultSet.getInt(3);
         return new SelectInfoResult(eventTimeMillis, 
leaseAcquisitionTimeMillis, dbLinger);
       } catch (SQLException e) {
@@ -399,17 +404,21 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
       throws SQLException, IOException {
     // Fetch values in row after attempted insert
     SelectInfoResult selectInfoResult = getRowInfo(flowAction);
+    // Another participant won the lease in between
+    if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
+      return new NoLongerLeasingStatus();
+    }
     if (numRowsUpdated == 1) {
       log.debug("Obtained lease for [{}, eventTimestamp: {}] successfully!", 
flowAction,
           selectInfoResult.eventTimeMillis);
       return new LeaseObtainedStatus(flowAction, 
selectInfoResult.eventTimeMillis,
-          selectInfoResult.getLeaseAcquisitionTimeMillis());
+          selectInfoResult.getLeaseAcquisitionTimeMillis().get());
     }
     log.debug("Another participant acquired lease in between for [{}, 
eventTimestamp: {}] - num rows updated: ",
         flowAction, selectInfoResult.eventTimeMillis, numRowsUpdated);
     // Another participant acquired lease in between
     return new LeasedToAnotherStatus(flowAction, 
selectInfoResult.getEventTimeMillis(),
-        selectInfoResult.getLeaseAcquisitionTimeMillis() + 
selectInfoResult.getDbLinger()
+        selectInfoResult.getLeaseAcquisitionTimeMillis().get() + 
selectInfoResult.getDbLinger()
             - (dbCurrentTimestamp.isPresent() ? 
dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
   }
 
@@ -545,7 +554,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   @Data
   static class SelectInfoResult {
     private final long eventTimeMillis;
-    private final long leaseAcquisitionTimeMillis;
+    private final Optional<Long> leaseAcquisitionTimeMillis;
     private final int dbLinger;
   }
 }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 1eb872586..5a3b9da15 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -19,7 +19,9 @@ package org.apache.gobblin.runtime.api;
 
 import com.typesafe.config.Config;
 import java.io.IOException;
+import java.sql.SQLException;
 import java.sql.Timestamp;
+import java.util.Optional;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -183,7 +185,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     // The following insert will fail since the eventTimestamp does not match
     int numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
         formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, 
true,
-        new Timestamp(99999), new 
Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis()));
+        new Timestamp(99999), new 
Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
     Assert.assertEquals(numRowsUpdated, 0);
 
     // The following insert will fail since the leaseAcquisitionTimestamp does 
not match
@@ -196,7 +198,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
     numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
         formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, 
true,
         new Timestamp(selectInfoResult.getEventTimeMillis()),
-        new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis()));
+        new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
     Assert.assertEquals(numRowsUpdated, 1);
   }
 
@@ -207,13 +209,16 @@ public class MysqlMultiActiveLeaseArbiterTest {
   its prior lease, encouraging the current participant to acquire a lease for 
its event.
    */
   @Test (dependsOnMethods = 
"testConditionallyAcquireLeaseIfFMatchingAllColsStatement")
-  public void testConditionallyAcquireLeaseIfFinishedLeasingStatement() throws 
IOException, InterruptedException {
+  public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
+      throws IOException, InterruptedException, SQLException {
     // Mark the resume action lease from above as completed by fabricating a 
LeaseObtainedStatus
     MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
         this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
     boolean markedSuccess = 
this.mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
-        resumeDagAction, selectInfoResult.getEventTimeMillis(), 
selectInfoResult.getLeaseAcquisitionTimeMillis()));
+        resumeDagAction, selectInfoResult.getEventTimeMillis(), 
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
     Assert.assertTrue(markedSuccess);
+    // Ensure no NPE results from calling this after a lease has been 
completed and acquisition timestamp val is NULL
+    mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, 
resumeDagAction, Optional.empty());
 
     // Sleep enough time for event to be considered distinct
     Thread.sleep(LINGER);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index ad90d7188..b773e8c20 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -496,7 +496,9 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
           ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL);
       this.orchestrator.orchestrate(flowSpec, jobProps, 
Long.parseLong(triggerTimestampMillis));
     } catch (Exception e) {
-      throw new JobException("Failed to run Spec: " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+      String exceptionPrefix = "Failed to run Spec: " + 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+      log.warn(exceptionPrefix + " because", e);
+      throw new JobException(exceptionPrefix, e);
     }
   }
 

Reply via email to