This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d9f83e638 [GOBBLIN-2096] Retry Transient SQL Exception for MALA
(#3982)
d9f83e638 is described below
commit d9f83e638b6eb34fa85b0317eee31cf39c1e8546
Author: umustafi <[email protected]>
AuthorDate: Thu Jun 20 08:59:56 2024 -0700
[GOBBLIN-2096] Retry Transient SQL Exception for MALA (#3982)
* Add retries to RetryableSQLException for MALA
* Throw exception if exceed max retry
---
.../MysqlMultiActiveLeaseArbiter.java | 24 +++++++++++++++++++---
.../MysqlMultiActiveLeaseArbiterTest.java | 9 ++++++--
2 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 4b910219b..7189613d9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -23,6 +23,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
+import java.sql.SQLTransientException;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Optional;
@@ -41,6 +42,7 @@ import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.DBStatementExecutor;
+import org.apache.gobblin.util.ExponentialBackoff;
/**
@@ -171,6 +173,9 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// Complete lease acquisition if values have not changed since lease was
acquired
protected static final String CONDITIONALLY_COMPLETE_LEASE_STATEMENT =
"UPDATE %s SET "
+ "event_timestamp=event_timestamp, lease_acquisition_timestamp = NULL "
+ WHERE_CLAUSE_TO_MATCH_ROW;
+ protected static final int MAX_RETRIES = 3;
+ protected static final long MIN_INITIAL_DELAY_MILLIS = 20L;
+ protected static final long DELAY_FOR_RETRY_RANGE_MILLIS = 200L;
private static final ThreadLocal<Calendar> UTC_CAL =
ThreadLocal.withInitial(() ->
Calendar.getInstance(TimeZone.getTimeZone("UTC")));
@@ -254,7 +259,10 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
log.debug("tryAcquireLease for [{}, is; {}, eventTimestamp: {}] - CASE
1: no existing row for this dag action,"
+ " then go ahead and insert", dagActionLeaseObject.getDagAction(),
dagActionLeaseObject.isReminder() ? "reminder" : "original",
dagActionLeaseObject.getEventTimeMillis());
- int numRowsUpdated =
attemptLeaseIfNewRow(dagActionLeaseObject.getDagAction());
+ int numRowsUpdated =
attemptLeaseIfNewRow(dagActionLeaseObject.getDagAction(),
+ ExponentialBackoff.builder().maxRetries(MAX_RETRIES)
+ .initialDelay(MIN_INITIAL_DELAY_MILLIS + (long) Math.random()
* DELAY_FOR_RETRY_RANGE_MILLIS)
+ .build());
return evaluateStatusAfterLeaseAttempt(numRowsUpdated,
dagActionLeaseObject.getDagAction(),
Optional.empty(), dagActionLeaseObject.isReminder(),
adoptConsensusFlowExecutionId);
}
@@ -420,7 +428,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
* near past for it.
* @return int corresponding to number of rows updated by INSERT statement
to acquire lease
*/
- protected int attemptLeaseIfNewRow(DagActionStore.DagAction dagAction)
throws IOException {
+ protected int attemptLeaseIfNewRow(DagActionStore.DagAction dagAction,
ExponentialBackoff exponentialBackoff) throws IOException {
String formattedAcquireLeaseNewRowStatement =
String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT,
this.leaseArbiterTableName);
return
dbStatementExecutor.withPreparedStatement(formattedAcquireLeaseNewRowStatement,
@@ -428,7 +436,17 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
completeInsertPreparedStatement(insertStatement, dagAction);
try {
return insertStatement.executeUpdate();
- } catch (SQLIntegrityConstraintViolationException e) {
+ } catch (SQLTransientException e) {
+ try {
+ if (exponentialBackoff.awaitNextRetryIfAvailable()) {
+ return attemptLeaseIfNewRow(dagAction, exponentialBackoff);
+ }
+ } catch (InterruptedException e2) {
+ throw new IOException(e2);
+ }
+ throw e;
+ }
+ catch (SQLIntegrityConstraintViolationException e) {
if (!e.getMessage().contains("Duplicate entry")) {
throw e;
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index b9abc37bf..0f97a290d 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -23,6 +23,7 @@ import java.sql.Timestamp;
import java.util.Optional;
import java.util.UUID;
+import org.apache.gobblin.util.ExponentialBackoff;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -201,9 +202,13 @@ public class MysqlMultiActiveLeaseArbiterTest {
@Test
public void testAcquireLeaseIfNewRow() throws IOException {
// Inserting the first time should update 1 row
-
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
1);
+
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction,
+
ExponentialBackoff.builder().maxRetries(MysqlMultiActiveLeaseArbiter.MAX_RETRIES)
+
.initialDelay(MysqlMultiActiveLeaseArbiter.MIN_INITIAL_DELAY_MILLIS).build()),
1);
// Inserting the second time should not update any rows
-
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction),
0);
+
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction,
+
ExponentialBackoff.builder().maxRetries(MysqlMultiActiveLeaseArbiter.MAX_RETRIES)
+
.initialDelay(MysqlMultiActiveLeaseArbiter.MIN_INITIAL_DELAY_MILLIS).build()),
0);
}
/*