This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d9f83e638 [GOBBLIN-2096] Retry Transient SQL Exception for MALA  
(#3982)
d9f83e638 is described below

commit d9f83e638b6eb34fa85b0317eee31cf39c1e8546
Author: umustafi <[email protected]>
AuthorDate: Thu Jun 20 08:59:56 2024 -0700

    [GOBBLIN-2096] Retry Transient SQL Exception for MALA  (#3982)
    
    * Add retries to RetryableSQLException for MALA
    * Throw exception if exceed max retry
---
 .../MysqlMultiActiveLeaseArbiter.java              | 24 +++++++++++++++++++---
 .../MysqlMultiActiveLeaseArbiterTest.java          |  9 ++++++--
 2 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 4b910219b..7189613d9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -23,6 +23,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLIntegrityConstraintViolationException;
+import java.sql.SQLTransientException;
 import java.sql.Timestamp;
 import java.util.Calendar;
 import java.util.Optional;
@@ -41,6 +42,7 @@ import org.apache.gobblin.metastore.MysqlDataSourceFactory;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.DBStatementExecutor;
+import org.apache.gobblin.util.ExponentialBackoff;
 
 
 /**
@@ -171,6 +173,9 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
   // Complete lease acquisition if values have not changed since lease was 
acquired
   protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT = 
"UPDATE %s SET "
       + "event_timestamp=event_timestamp, lease_acquisition_timestamp = NULL " 
+ WHERE_CLAUSE_TO_MATCH_ROW;
+  protected static final int MAX_RETRIES = 3;
+  protected static final long MIN_INITIAL_DELAY_MILLIS = 20L;
+  protected static final long DELAY_FOR_RETRY_RANGE_MILLIS = 200L;
   private static final ThreadLocal<Calendar> UTC_CAL =
       ThreadLocal.withInitial(() -> 
Calendar.getInstance(TimeZone.getTimeZone("UTC")));
 
@@ -254,7 +259,10 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE 
1: no existing row for this dag action,"
             + " then go ahead and insert", dagActionLeaseObject.getDagAction(),
             dagActionLeaseObject.isReminder() ? "reminder" : "original", 
dagActionLeaseObject.getEventTimeMillis());
-        int numRowsUpdated = 
attemptLeaseIfNewRow(dagActionLeaseObject.getDagAction());
+        int numRowsUpdated = 
attemptLeaseIfNewRow(dagActionLeaseObject.getDagAction(),
+            ExponentialBackoff.builder().maxRetries(MAX_RETRIES)
+                .initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) Math.random() 
* DELAY_FOR_RETRY_RANGE_MILLIS)
+                .build());
        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, 
dagActionLeaseObject.getDagAction(),
            Optional.empty(), dagActionLeaseObject.isReminder(), 
adoptConsensusFlowExecutionId);
       }
@@ -420,7 +428,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
    * near past for it.
    * @return int corresponding to number of rows updated by INSERT statement 
to acquire lease
    */
-  protected int attemptLeaseIfNewRow(DagActionStore.DagAction dagAction) 
throws IOException {
+  protected int attemptLeaseIfNewRow(DagActionStore.DagAction dagAction, 
ExponentialBackoff exponentialBackoff) throws IOException {
     String formattedAcquireLeaseNewRowStatement =
         String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, 
this.leaseArbiterTableName);
     return 
dbStatementExecutor.withPreparedStatement(formattedAcquireLeaseNewRowStatement,
@@ -428,7 +436,17 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           completeInsertPreparedStatement(insertStatement, dagAction);
           try {
             return insertStatement.executeUpdate();
-          } catch (SQLIntegrityConstraintViolationException e) {
+          } catch (SQLTransientException e) {
+            try {
+              if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+                return attemptLeaseIfNewRow(dagAction, exponentialBackoff);
+              }
+            } catch (InterruptedException e2) {
+              throw new IOException(e2);
+            }
+            throw e; 
+          }
+          catch (SQLIntegrityConstraintViolationException e) {
             if (!e.getMessage().contains("Duplicate entry")) {
               throw e;
             }
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index b9abc37bf..0f97a290d 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -23,6 +23,7 @@ import java.sql.Timestamp;
 import java.util.Optional;
 import java.util.UUID;
 
+import org.apache.gobblin.util.ExponentialBackoff;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -201,9 +202,13 @@ public class MysqlMultiActiveLeaseArbiterTest {
   @Test
   public void testAcquireLeaseIfNewRow() throws IOException {
     // Inserting the first time should update 1 row
-    
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
 1);
+    
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction,
+        
ExponentialBackoff.builder().maxRetries(MysqlMultiActiveLeaseArbiter.MAX_RETRIES)
+            
.initialDelay(MysqlMultiActiveLeaseArbiter.MIN_INITIAL_DELAY_MILLIS).build()), 
1);
     // Inserting the second time should not update any rows
-    
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
 0);
+    
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction,
+        
ExponentialBackoff.builder().maxRetries(MysqlMultiActiveLeaseArbiter.MAX_RETRIES)
+            
.initialDelay(MysqlMultiActiveLeaseArbiter.MIN_INITIAL_DELAY_MILLIS).build()), 
0);
   }
 
     /*

Reply via email to