[ 
https://issues.apache.org/jira/browse/GOBBLIN-1851?focusedWorklogId=871620&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-871620
 ]

ASF GitHub Bot logged work on GOBBLIN-1851:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jul/23 19:41
            Start Date: 18/Jul/23 19:41
    Worklog Time Spent: 10m 
      Work Description: umustafi commented on code in PR #3715:
URL: https://github.com/apache/gobblin/pull/3715#discussion_r1267236668


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -219,84 +258,142 @@ else if (leaseValidityStatus == 2) {
         // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfMatchingAllStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfMatchingAllStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
       } // No longer leasing this event
-        // CASE 6: Same event, no longer leasing event in db: terminate
         if (isWithinEpsilon) {
+          log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5: 
Same event, no longer leasing event in db: "
+              + "terminate", flowAction, dbCurrentTimestamp.getTime());
           return new NoLongerLeasingStatus();
         }
-        // CASE 7: Distinct event, no longer leasing event in db
+        log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6: 
Distinct event, no longer leasing event in "
+            + "db", flowAction, dbCurrentTimestamp.getTime());
         // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfFinishedStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfFinishedStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   false, dbEventTimestamp, null);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
 
+  protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws 
IOException {
+    try {
+      // Extract values from result set
+      Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("lease_acquisition_timestamp");
+      boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
+      int leaseValidityStatus = resultSet.getInt("lease_validity_status");
+      int dbLinger = resultSet.getInt("linger");
+      Timestamp dbCurrentTimestamp = 
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+      return new GetEventInfoResult(dbEventTimestamp, 
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
+          dbLinger, dbCurrentTimestamp);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      if (resultSet != null) {
+        try {
+          resultSet.close();
+        } catch (SQLException e) {
+          throw new IOException(e);
+        }
+      }
+    }
+  }
+
+  protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) 
throws IOException {
+      try {
+        if (!resultSet.next()) {
+          log.error("Expected num rows and lease_acquisition_timestamp 
returned from query but received nothing");

Review Comment:
   Oh good catch, I wan't this code path to terminate so instead will through 
an IO Error. 



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -219,84 +258,142 @@ else if (leaseValidityStatus == 2) {
         // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfMatchingAllStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfMatchingAllStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
       } // No longer leasing this event
-        // CASE 6: Same event, no longer leasing event in db: terminate
         if (isWithinEpsilon) {
+          log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5: 
Same event, no longer leasing event in db: "
+              + "terminate", flowAction, dbCurrentTimestamp.getTime());
           return new NoLongerLeasingStatus();
         }
-        // CASE 7: Distinct event, no longer leasing event in db
+        log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6: 
Distinct event, no longer leasing event in "
+            + "db", flowAction, dbCurrentTimestamp.getTime());
         // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfFinishedStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfFinishedStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   false, dbEventTimestamp, null);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
 
+  protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws 
IOException {
+    try {
+      // Extract values from result set
+      Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("lease_acquisition_timestamp");
+      boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
+      int leaseValidityStatus = resultSet.getInt("lease_validity_status");
+      int dbLinger = resultSet.getInt("linger");
+      Timestamp dbCurrentTimestamp = 
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+      return new GetEventInfoResult(dbEventTimestamp, 
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
+          dbLinger, dbCurrentTimestamp);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      if (resultSet != null) {
+        try {
+          resultSet.close();
+        } catch (SQLException e) {
+          throw new IOException(e);
+        }
+      }
+    }
+  }
+
+  protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) 
throws IOException {
+      try {
+        if (!resultSet.next()) {
+          log.error("Expected num rows and lease_acquisition_timestamp 
returned from query but received nothing");

Review Comment:
   Oh good catch, I want this code path to terminate so instead will through an 
IO Error. 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 871620)
    Time Spent: 1h 40m  (was: 1.5h)

> Unit Testing of Multi-active Algorithm
> --------------------------------------
>
>                 Key: GOBBLIN-1851
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1851
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Tests all cases of trying to acquire a lease for a flow action event with one 
> participant involved and makes corresponding fixes in the 
> `MultiActiveLeaseArbiter`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to