[ 
https://issues.apache.org/jira/browse/GOBBLIN-1859?focusedWorklogId=872701&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-872701
 ]

ASF GitHub Bot logged work on GOBBLIN-1859:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Jul/23 21:10
            Start Date: 24/Jul/23 21:10
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3721:
URL: https://github.com/apache/gobblin/pull/3721#discussion_r1272742265


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -101,8 +102,8 @@ protected interface CheckedFunction<T, R> {
   private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE 
IF NOT EXISTS %s "
       + "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY 
(primary_key))";
   // Only insert epsilon and linger values from config if this table does not 
contain a pre-existing values already.
-  private static final String UPSERT_CONSTANTS_TABLE_STATEMENT = "INSERT INTO 
%s (primary_key, epsilon, linger) "
-      + "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon), 
linger=VALUES(linger)";
+  private static final String INSERT_CONSTANTS_TABLE_STATEMENT = "INSERT INTO 
%s (primary_key, epsilon, linger) "
+      + "VALUES(1, ?, ?)";

Review Comment:
   sorry if I'm missing something, but, if not using `ON DUPLICATE KEY 
UPDATE`... how are `epsilon` and/or `linger` ever updated?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -212,11 +220,19 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction flowAction, l
         log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no 
existing row for this flow action, then go"
                 + " ahead and insert", flowAction, eventTimeMillis);
         String formattedAcquireLeaseNewRowStatement =
-            String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, 
this.leaseArbiterTableName);
+            String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, 
this.leaseArbiterTableName,
+                this.leaseArbiterTableName);
         int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseNewRowStatement,
             insertStatement -> {
               completeInsertPreparedStatement(insertStatement, flowAction);
-              return insertStatement.executeUpdate();
+              try {
+                return insertStatement.executeUpdate();
+              } catch (SQLIntegrityConstraintViolationException e) {
+                if (!e.getMessage().contains("Duplicate entry")) {
+                  throw e;
+                }
+              }
+              return 0;

Review Comment:
   feels like this belongs inside the `catch`, no? (even possibly as an `else` 
of the dupe entry)



##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -143,4 +159,128 @@ public void testAcquireLeaseSingleParticipant() throws 
Exception {
     Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
         <= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
   }
+
+  /*
+     Tests CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT to ensure an 
insertion is not attempted unless the table
+     state remains the same as the prior read, which expects no row matching 
the primary key in the table
+     Note: this isolates and tests CASE 1 in which another participant could 
have acquired the lease between the time
+     the read was done and subsequent write was carried out
+  */
+  @Test //(dependsOnMethods = "testAcquireLeaseSingleParticipant")
+  public void testConditionallyAcquireLeaseIfNewRow() throws IOException {
+    // Inserting the first time should update 1 row
+    int numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.withPreparedStatement(formattedAcquireLeaseNewRowStatement,
+        insertStatement -> {
+          completeInsertPreparedStatement(insertStatement, resumeDagAction);
+          return insertStatement.executeUpdate();
+        }, true);
+    Assert.assertEquals(numRowsUpdated, 1);

Review Comment:
   please explain what's being tested here ITO the class under test.  I would 
expect the test to call one encapsulated method of `MysqlMALeaseArbiter`'s API, 
but here is seems like you're combining bits and pieces of it (like 
`withPreparedStatement`), which arguably should be `private`/`protected`.
   
   generally the public interface is up for testing... maybe with verification 
for a SMALL number of `@VisibleForTesting` methods.
   
   perhaps I'm just not understanding the connection between this test and the 
overall public API...



##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -143,4 +159,128 @@ public void testAcquireLeaseSingleParticipant() throws 
Exception {
     Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
         <= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
   }
+
+  /*
+     Tests CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT to ensure an 
insertion is not attempted unless the table
+     state remains the same as the prior read, which expects no row matching 
the primary key in the table
+     Note: this isolates and tests CASE 1 in which another participant could 
have acquired the lease between the time
+     the read was done and subsequent write was carried out
+  */
+  @Test //(dependsOnMethods = "testAcquireLeaseSingleParticipant")

Review Comment:
   remove comment?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 872701)
    Time Spent: 1h 10m  (was: 1h)

> Multi-active Unit Test for Multiple Participant
> -----------------------------------------------
>
>                 Key: GOBBLIN-1859
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1859
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> In multi-active mode, multiple participants will be reading/writing to MySQL 
> table. We want to make sure each participant acts on the table in a manner 
> that takes into account that the state may have changed while the 
> MultiActiveLeaseArbiter is processing the result of a READ. This PR adds unit 
> tests that validate SQL Insertion statements are conditional upon the state 
> of the particular row corresponding to the flow action event being unchanged 
> from the read made by this participant. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to