[
https://issues.apache.org/jira/browse/GOBBLIN-1851?focusedWorklogId=871620&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-871620
]
ASF GitHub Bot logged work on GOBBLIN-1851:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/Jul/23 19:41
Start Date: 18/Jul/23 19:41
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3715:
URL: https://github.com/apache/gobblin/pull/3715#discussion_r1267236668
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -219,84 +258,142 @@ else if (leaseValidityStatus == 2) {
// Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
String formattedAcquireLeaseIfMatchingAllStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseIfMatchingAllStatement + "; " +
formattedSelectAfterInsertStatement,
- updateStatement -> {
- completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
true,
true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
- return updateStatement.executeQuery();
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
} // No longer leasing this event
- // CASE 6: Same event, no longer leasing event in db: terminate
if (isWithinEpsilon) {
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5:
Same event, no longer leasing event in db: "
+ + "terminate", flowAction, dbCurrentTimestamp.getTime());
return new NoLongerLeasingStatus();
}
- // CASE 7: Distinct event, no longer leasing event in db
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6:
Distinct event, no longer leasing event in "
+ + "db", flowAction, dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
String formattedAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseIfFinishedStatement + "; " +
formattedSelectAfterInsertStatement,
- updateStatement -> {
- completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
true,
false, dbEventTimestamp, null);
- return updateStatement.executeQuery();
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
+ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws
IOException {
+ try {
+ // Extract values from result set
+ Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+ Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("lease_acquisition_timestamp");
+ boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
+ int leaseValidityStatus = resultSet.getInt("lease_validity_status");
+ int dbLinger = resultSet.getInt("linger");
+ Timestamp dbCurrentTimestamp =
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+ return new GetEventInfoResult(dbEventTimestamp,
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
+ dbLinger, dbCurrentTimestamp);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ if (resultSet != null) {
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
+ protected SelectInfoResult createSelectInfoResult(ResultSet resultSet)
throws IOException {
+ try {
+ if (!resultSet.next()) {
+ log.error("Expected num rows and lease_acquisition_timestamp
returned from query but received nothing");
Review Comment:
Oh good catch, I wan't this code path to terminate so instead will through
an IO Error.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -219,84 +258,142 @@ else if (leaseValidityStatus == 2) {
// Use our event to acquire lease, check for previous db
eventTimestamp and leaseAcquisitionTimestamp
String formattedAcquireLeaseIfMatchingAllStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseIfMatchingAllStatement + "; " +
formattedSelectAfterInsertStatement,
- updateStatement -> {
- completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
true,
true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
- return updateStatement.executeQuery();
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
} // No longer leasing this event
- // CASE 6: Same event, no longer leasing event in db: terminate
if (isWithinEpsilon) {
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5:
Same event, no longer leasing event in db: "
+ + "terminate", flowAction, dbCurrentTimestamp.getTime());
return new NoLongerLeasingStatus();
}
- // CASE 7: Distinct event, no longer leasing event in db
+ log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6:
Distinct event, no longer leasing event in "
+ + "db", flowAction, dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db
eventTimestamp and NULL leaseAcquisitionTimestamp
String formattedAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT,
this.leaseArbiterTableName);
- ResultSet rs = withPreparedStatement(
- formattedAcquireLeaseIfFinishedStatement + "; " +
formattedSelectAfterInsertStatement,
- updateStatement -> {
- completeUpdatePreparedStatement(updateStatement, flowAction,
eventTimeMillis, true,
+ int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+ insertStatement -> {
+ completeUpdatePreparedStatement(insertStatement, flowAction,
true,
false, dbEventTimestamp, null);
- return updateStatement.executeQuery();
+ return insertStatement.executeUpdate();
}, true);
- return handleResultFromAttemptedLeaseObtainment(rs, flowAction,
eventTimeMillis);
+ return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction,
Optional.of(dbCurrentTimestamp));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
+ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws
IOException {
+ try {
+ // Extract values from result set
+ Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+ Timestamp dbLeaseAcquisitionTimestamp =
resultSet.getTimestamp("lease_acquisition_timestamp");
+ boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
+ int leaseValidityStatus = resultSet.getInt("lease_validity_status");
+ int dbLinger = resultSet.getInt("linger");
+ Timestamp dbCurrentTimestamp =
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+ return new GetEventInfoResult(dbEventTimestamp,
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
+ dbLinger, dbCurrentTimestamp);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ } finally {
+ if (resultSet != null) {
+ try {
+ resultSet.close();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+ }
+
+ protected SelectInfoResult createSelectInfoResult(ResultSet resultSet)
throws IOException {
+ try {
+ if (!resultSet.next()) {
+ log.error("Expected num rows and lease_acquisition_timestamp
returned from query but received nothing");
Review Comment:
Oh good catch, I want this code path to terminate so instead will through an
IO Error.
Issue Time Tracking
-------------------
Worklog Id: (was: 871620)
Time Spent: 1h 40m (was: 1.5h)
> Unit Testing of Multi-active Algorithm
> --------------------------------------
>
> Key: GOBBLIN-1851
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1851
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> Tests all cases of trying to acquire a lease for a flow action event with one
> participant involved and makes corresponding fixes in the
> `MultiActiveLeaseArbiter`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)