This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d600b52dd [GOBBLIN-1907] Handle Lease Completion From Other
Multi-active Participants (#3771)
d600b52dd is described below
commit d600b52dd20c21cf26203d0852613db655fb7e96
Author: umustafi <[email protected]>
AuthorDate: Wed Sep 13 17:12:29 2023 -0700
[GOBBLIN-1907] Handle Lease Completion From Other Multi-active Participants
(#3771)
* Handle Lease Completion From Other Multi-active Participants
* Use optional instead of sentinel value when lease acquisition timestamp
null
* Use Java Optional instead of Guava
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 27 ++++++++++++++--------
.../api/MysqlMultiActiveLeaseArbiterTest.java | 13 +++++++----
.../scheduler/GobblinServiceJobScheduler.java | 4 +++-
3 files changed, 30 insertions(+), 14 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 33ab56189..6352a1ff1 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.runtime.api;
-import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
@@ -28,6 +27,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Timestamp;
+import java.util.Optional;
import javax.sql.DataSource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -200,7 +200,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no
existing row for this flow action, then go"
+ " ahead and insert", flowAction, eventTimeMillis);
int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
- return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.absent());
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.empty());
}
// Extract values from result set
@@ -273,7 +273,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
ResultSet resultSet = getInfoStatement.executeQuery();
try {
if (!resultSet.next()) {
- return Optional.<GetEventInfoResult>absent();
+ return Optional.<GetEventInfoResult>empty();
}
return Optional.of(createGetInfoResult(resultSet));
} finally {
@@ -367,11 +367,16 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
protected static SelectInfoResult createSelectInfoResult(ResultSet
resultSet) throws IOException {
try {
if (!resultSet.next()) {
- throw new IOException("Expected num rows and
lease_acquisition_timestamp returned from query but received nothing, so "
- + "providing empty result to lease evaluation code");
+ throw new IOException("Expected resultSet containing row information
for the lease that was attempted but "
+ + "received nothing.");
+ }
+ if (resultSet.getTimestamp(1) == null) {
+ throw new IOException("event_timestamp should never be null (it is
always set to current timestamp)");
}
long eventTimeMillis = resultSet.getTimestamp(1).getTime();
- long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
+ // Lease acquisition timestamp is null if another participant has
completed the lease
+ Optional<Long> leaseAcquisitionTimeMillis = resultSet.getTimestamp(2)
== null ? Optional.empty() :
+ Optional.of(resultSet.getTimestamp(2).getTime());
int dbLinger = resultSet.getInt(3);
return new SelectInfoResult(eventTimeMillis,
leaseAcquisitionTimeMillis, dbLinger);
} catch (SQLException e) {
@@ -399,17 +404,21 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
throws SQLException, IOException {
// Fetch values in row after attempted insert
SelectInfoResult selectInfoResult = getRowInfo(flowAction);
+ // Another participant won the lease in between
+ if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
+ return new NoLongerLeasingStatus();
+ }
if (numRowsUpdated == 1) {
log.debug("Obtained lease for [{}, eventTimestamp: {}] successfully!",
flowAction,
selectInfoResult.eventTimeMillis);
return new LeaseObtainedStatus(flowAction,
selectInfoResult.eventTimeMillis,
- selectInfoResult.getLeaseAcquisitionTimeMillis());
+ selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
log.debug("Another participant acquired lease in between for [{},
eventTimestamp: {}] - num rows updated: ",
flowAction, selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction,
selectInfoResult.getEventTimeMillis(),
- selectInfoResult.getLeaseAcquisitionTimeMillis() +
selectInfoResult.getDbLinger()
+ selectInfoResult.getLeaseAcquisitionTimeMillis().get() +
selectInfoResult.getDbLinger()
- (dbCurrentTimestamp.isPresent() ?
dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
}
@@ -545,7 +554,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
@Data
static class SelectInfoResult {
private final long eventTimeMillis;
- private final long leaseAcquisitionTimeMillis;
+ private final Optional<Long> leaseAcquisitionTimeMillis;
private final int dbLinger;
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 1eb872586..5a3b9da15 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -19,7 +19,9 @@ package org.apache.gobblin.runtime.api;
import com.typesafe.config.Config;
import java.io.IOException;
+import java.sql.SQLException;
import java.sql.Timestamp;
+import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -183,7 +185,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// The following insert will fail since the eventTimestamp does not match
int numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true,
true,
- new Timestamp(99999), new
Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis()));
+ new Timestamp(99999), new
Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertEquals(numRowsUpdated, 0);
// The following insert will fail since the leaseAcquisitionTimestamp does
not match
@@ -196,7 +198,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true,
true,
new Timestamp(selectInfoResult.getEventTimeMillis()),
- new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis()));
+ new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertEquals(numRowsUpdated, 1);
}
@@ -207,13 +209,16 @@ public class MysqlMultiActiveLeaseArbiterTest {
its prior lease, encouraging the current participant to acquire a lease for
its event.
*/
@Test (dependsOnMethods =
"testConditionallyAcquireLeaseIfFMatchingAllColsStatement")
- public void testConditionallyAcquireLeaseIfFinishedLeasingStatement() throws
IOException, InterruptedException {
+ public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
+ throws IOException, InterruptedException, SQLException {
// Mark the resume action lease from above as completed by fabricating a
LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
boolean markedSuccess =
this.mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
- resumeDagAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis()));
+ resumeDagAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);
+ // Ensure no NPE results from calling this after a lease has been
completed and acquisition timestamp val is NULL
+ mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1,
resumeDagAction, Optional.empty());
// Sleep enough time for event to be considered distinct
Thread.sleep(LINGER);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index ad90d7188..b773e8c20 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -496,7 +496,9 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL);
this.orchestrator.orchestrate(flowSpec, jobProps,
Long.parseLong(triggerTimestampMillis));
} catch (Exception e) {
- throw new JobException("Failed to run Spec: " +
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
+ String exceptionPrefix = "Failed to run Spec: " +
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+ log.warn(exceptionPrefix + " because", e);
+ throw new JobException(exceptionPrefix, e);
}
}