[
https://issues.apache.org/jira/browse/GOBBLIN-1859?focusedWorklogId=872701&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-872701
]
ASF GitHub Bot logged work on GOBBLIN-1859:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 24/Jul/23 21:10
Start Date: 24/Jul/23 21:10
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3721:
URL: https://github.com/apache/gobblin/pull/3721#discussion_r1272742265
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -101,8 +102,8 @@ protected interface CheckedFunction<T, R> {
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE
IF NOT EXISTS %s "
+ "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY
(primary_key))";
// Only insert epsilon and linger values from config if this table does not
contain a pre-existing values already.
- private static final String UPSERT_CONSTANTS_TABLE_STATEMENT = "INSERT INTO
%s (primary_key, epsilon, linger) "
- + "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon),
linger=VALUES(linger)";
+ private static final String INSERT_CONSTANTS_TABLE_STATEMENT = "INSERT INTO
%s (primary_key, epsilon, linger) "
+ + "VALUES(1, ?, ?)";
Review Comment:
sorry if I'm missing something, but, if not using `ON DUPLICATE KEY
UPDATE`... how are `epsilon` and/or `linger` ever updated?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -212,11 +220,19 @@ public LeaseAttemptStatus
tryAcquireLease(DagActionStore.DagAction flowAction, l
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no
existing row for this flow action, then go"
+ " ahead and insert", flowAction, eventTimeMillis);
String formattedAcquireLeaseNewRowStatement =
- String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT,
this.leaseArbiterTableName);
+ String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT,
this.leaseArbiterTableName,
+ this.leaseArbiterTableName);
int numRowsUpdated =
withPreparedStatement(formattedAcquireLeaseNewRowStatement,
insertStatement -> {
completeInsertPreparedStatement(insertStatement, flowAction);
- return insertStatement.executeUpdate();
+ try {
+ return insertStatement.executeUpdate();
+ } catch (SQLIntegrityConstraintViolationException e) {
+ if (!e.getMessage().contains("Duplicate entry")) {
+ throw e;
+ }
+ }
+ return 0;
Review Comment:
feels like this belongs inside the `catch`, no? (even possibly as an `else`
of the dupe entry)
##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -143,4 +159,128 @@ public void testAcquireLeaseSingleParticipant() throws
Exception {
Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
<= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
}
+
+ /*
+ Tests CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT to ensure an
insertion is not attempted unless the table
+ state remains the same as the prior read, which expects no row matching
the primary key in the table
+ Note: this isolates and tests CASE 1 in which another participant could
have acquired the lease between the time
+ the read was done and subsequent write was carried out
+ */
+ @Test //(dependsOnMethods = "testAcquireLeaseSingleParticipant")
+ public void testConditionallyAcquireLeaseIfNewRow() throws IOException {
+ // Inserting the first time should update 1 row
+ int numRowsUpdated =
this.mysqlMultiActiveLeaseArbiter.withPreparedStatement(formattedAcquireLeaseNewRowStatement,
+ insertStatement -> {
+ completeInsertPreparedStatement(insertStatement, resumeDagAction);
+ return insertStatement.executeUpdate();
+ }, true);
+ Assert.assertEquals(numRowsUpdated, 1);
Review Comment:
please explain what's being tested here ITO the class under test. I would
expect the test to call one encapsulated method of `MysqlMALeaseArbiter`'s API,
but here is seems like you're combining bits and pieces of it (like
`withPreparedStatement`), which arguably should be `private`/`protected`.
generally the public interface is up for testing... maybe with verification
for a SMALL number of `@VisibleForTesting` methods.
perhaps I'm just not understanding the connection between this test and the
overall public API...
##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -143,4 +159,128 @@ public void testAcquireLeaseSingleParticipant() throws
Exception {
Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
<= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
}
+
+ /*
+ Tests CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT to ensure an
insertion is not attempted unless the table
+ state remains the same as the prior read, which expects no row matching
the primary key in the table
+ Note: this isolates and tests CASE 1 in which another participant could
have acquired the lease between the time
+ the read was done and subsequent write was carried out
+ */
+ @Test //(dependsOnMethods = "testAcquireLeaseSingleParticipant")
Review Comment:
remove comment?
Issue Time Tracking
-------------------
Worklog Id: (was: 872701)
Time Spent: 1h 10m (was: 1h)
> Multi-active Unit Test for Multiple Participant
> -----------------------------------------------
>
> Key: GOBBLIN-1859
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1859
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> In multi-active mode, multiple participants will be reading/writing to MySQL
> table. We want to make sure each participant acts on the table in a manner
> that takes into account that the state may have changed while the
> MultiActiveLeaseArbiter is processing the result of a READ. This PR adds unit
> tests that validate SQL Insertion statements are conditional upon the state
> of the particular row corresponding to the flow action event being unchanged
> from the read made by this participant.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)