[ 
https://issues.apache.org/jira/browse/GOBBLIN-2073?focusedWorklogId=921347&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-921347
 ]

ASF GitHub Bot logged work on GOBBLIN-2073:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/May/24 08:32
            Start Date: 30/May/24 08:32
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3952:
URL: https://github.com/apache/gobblin/pull/3952#discussion_r1620079757


##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java:
##########
@@ -35,9 +35,9 @@ public class FlowLaunchHandlerTest {
   String cronExpressionSuffix = 
truncateFirstTwoFieldsOfCronExpression(cronExpression);
   int schedulerBackOffMillis = 10;
   DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction("flowName", "flowGroup",
-      String.valueOf(flowExecutionId), "jobName", 
DagActionStore.DagActionType.LAUNCH);
+      String.valueOf(flowExecutionId), "jobName", 
DagActionStore.DagActionType.LAUNCH, eventToRevisit);
   LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
-      new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, eventToRevisit, 
minimumLingerDurationMillis);
+      new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, 
minimumLingerDurationMillis);

Review Comment:
   `eventToRevisit` seems misnamed, since it's a timestamp, not an event.  is 
it the `concensusTimestampMillis`?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -52,14 +52,14 @@ public class MysqlMultiActiveLeaseArbiterTest {
   private static final String flowName = "testFlowName";
   private static final String jobName = "testJobName";
   private static final String flowExecutionId = "12345677";
+  private static final long eventTimeMillis = System.currentTimeMillis();

Review Comment:
   usually test constants should be initialized by a well-known value.  would 
that work here?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -232,16 +234,19 @@ public void 
testConditionallyAcquireLeaseIfFMatchingAllColsStatement() throws IO
    */
   @Test (dependsOnMethods = 
"testConditionallyAcquireLeaseIfFMatchingAllColsStatement")
   public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
-      throws IOException, InterruptedException, SQLException {
+      throws IOException, SQLException {
+    /*

Review Comment:
   supposed to be commented out?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -103,21 +103,23 @@ Tests all cases of trying to acquire a lease (CASES 1-6 
detailed below) for a fl
   public void testAcquireLeaseSingleParticipant() throws Exception {
     // Tests CASE 1 of acquire lease for a flow action event not present in DB
     LeaseAttemptStatus firstLaunchStatus =
-        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, 
eventTimeMillis, false, true);
+        mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, true);
     Assert.assertTrue(firstLaunchStatus instanceof 
LeaseAttemptStatus.LeaseObtainedStatus);
     LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
-    Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <=
-        firstObtainedStatus.getLeaseAcquisitionTimestamp());
+    long consensusEventTimeMillis = firstObtainedStatus.getEventTimeMillis();
+    Assert.assertTrue(consensusEventTimeMillis <= 
firstObtainedStatus.getLeaseAcquisitionTimestamp());
+    // Make sure consensusEventTimeMillis is set and it's not 0 or the 
original event time
+    Assert.assertTrue(consensusEventTimeMillis != eventTimeMillis && 
consensusEventTimeMillis != 0);

Review Comment:
   let's use a positive assertion rather than a negative one, as the latter can 
hide many possible problems; see: 
https://en.wikipedia.org/wiki/Anna_Karenina_principle



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -232,16 +234,19 @@ public void 
testConditionallyAcquireLeaseIfFMatchingAllColsStatement() throws IO
    */
   @Test (dependsOnMethods = 
"testConditionallyAcquireLeaseIfFMatchingAllColsStatement")

Review Comment:
   what's the `F` for in the name `testCond...IfFMatching...`?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java:
##########
@@ -49,14 +49,25 @@ class DagAction {
     final String flowExecutionId;
     final String jobName;
     final DagActionType dagActionType;
-    final boolean isReminder;
+    boolean isReminder;
+    long eventTimeMillis;

Review Comment:
   looks like an instance could be created w/o these being set.  shouldn't we 
prevent that via `final`?
   
   BTW, what is the "event" behind this time?  is it when the API request came 
in to say perform a "kill action"?  why is it critical to store that?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -305,24 +306,26 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction dagAction, lo
           DagActionStore.DagAction updatedDagAction =
               adoptConsensusFlowExecutionId ? 
dagAction.updateFlowExecutionId(dbEventTimestamp.getTime()) : dagAction;
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 2: Same event, lease is valid",
-              updatedDagAction, isReminderEvent ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
+              updatedDagAction, dagAction.isReminder ? "reminder" : 
"original", dbCurrentTimestamp.getTime());
           // Utilize db timestamp for reminder
-          return new 
LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, 
dbEventTimestamp.getTime(),
+          updatedDagAction.setEventTimeMillis(dbEventTimestamp.getTime());

Review Comment:
   I've really had a lot of trouble grasping the motivation behind this PR... 
but overall is the crux of it that you'd like `DagActionReminderScheduler` to 
be operate on its own personal extension of a `DagAction`, such as the example 
(`Reflection`) above?



##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -264,12 +269,19 @@ public void 
testConditionallyAcquireLeaseIfFinishedLeasingStatement()
    */
   @Test (dependsOnMethods = 
"testConditionallyAcquireLeaseIfFinishedLeasingStatement")
   public void testOlderReminderEventAcquireLease() throws IOException {
+    // Use current time as flowName to create unique event
+    DagActionStore.DagAction newLaunchAction = new 
DagActionStore.DagAction(flowGroup,
+        String.valueOf(System.currentTimeMillis()), flowExecutionId, jobName, 
DagActionStore.DagActionType.LAUNCH);

Review Comment:
   please avoid "current time" in a test.  instead use a known value.  unless 
this is somehow called in a loop, should be easy to choose a distinct flow name.
   
   confusing name too, given there's already a `launchDagAction2`.
   
   also, are tests like this the only callers of the 5-param version of the DA 
ctor?
   
   



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -305,24 +306,26 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction dagAction, lo
           DagActionStore.DagAction updatedDagAction =
               adoptConsensusFlowExecutionId ? 
dagAction.updateFlowExecutionId(dbEventTimestamp.getTime()) : dagAction;
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 2: Same event, lease is valid",
-              updatedDagAction, isReminderEvent ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
+              updatedDagAction, dagAction.isReminder ? "reminder" : 
"original", dbCurrentTimestamp.getTime());
           // Utilize db timestamp for reminder
-          return new 
LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, 
dbEventTimestamp.getTime(),
+          updatedDagAction.setEventTimeMillis(dbEventTimestamp.getTime());

Review Comment:
   I see this being updated, but unless I missed something critical, it won't 
actually be durably persisted to the DB, will it?
   
   `DagAction` is meant for sharing between the various distributed hosts.  in 
most any distributed sharing context, mutability is at best challenging, and at 
worst a tragic invitation to error.  therefore I DON'T SUGGEST to durably 
persist the event time.  immutability should be our standard technique.
   
   where there's a need for additional local-to-each-host info, just compose 
around `DagAction`.  e.g.:
   ```
   @Data
   class ThisHostDagActionReflection {
     private final DagAction dagAction;
     private final long timestampMillis;
   }
   ```
   [yikes!  crappy name... sorry]





Issue Time Tracking
-------------------

    Worklog Id:     (was: 921347)
    Time Spent: 20m  (was: 10m)

> Add eventTime and reminder fields to DagAction POJO
> ---------------------------------------------------
>
>                 Key: GOBBLIN-2073
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2073
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> An event timestamp and reminder flag are used in conjunction with DagActions 
> for lease arbitration and for reminder dagActions. It makes sense to include 
> these fields in the DagAction object itself to easily transfer the 
> information from one location to another. In particular for reminder events 
> used to check back in on incomplete dagAction leases, these two fields are 
> utilized and it's easiest to keep track of them when it's maintained within 
> the consensusDagAction returned in a LeaseAttemptStatus



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to