This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 083913de9 [GOBBLIN-2070] Add eventTimeMillis field to 
leaseAttemptStatus for adhoc flows where… (#3951)
083913de9 is described below

commit 083913de95bfce14cd3147f04ea79d5dee0c0f32
Author: umustafi <[email protected]>
AuthorDate: Tue May 21 13:58:20 2024 -0700

    [GOBBLIN-2070] Add eventTimeMillis field to leaseAttemptStatus for adhoc 
flows where… (#3951)
    
    * Add eventTimeMillis field to leaseAttemptStatus for adhoc flows where 
flowExecutionId is different than the event time of the lease
    * Update tests to capture edge case
---
 .../modules/orchestration/FlowLaunchHandler.java   |  2 +-
 .../modules/orchestration/LeaseAttemptStatus.java  | 32 ++++++++--------------
 .../MysqlMultiActiveLeaseArbiter.java              | 10 ++++---
 .../DagManagementTaskStreamImplTest.java           |  4 +--
 .../orchestration/FlowLaunchHandlerTest.java       |  7 +++--
 .../MysqlMultiActiveLeaseArbiterTest.java          | 18 ++++++++----
 6 files changed, 36 insertions(+), 37 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
index 1c7c82358..1c37a404c 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java
@@ -126,7 +126,7 @@ public class FlowLaunchHandler {
     } else if (leaseAttempt instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus) { // already have one: just return it
       return Optional.of((LeaseAttemptStatus.LeasedToAnotherStatus) 
leaseAttempt);
     } else if (leaseAttempt instanceof LeaseAttemptStatus.LeaseObtainedStatus) 
{ // remind w/o delay to immediately re-attempt handling
-      return Optional.of(new 
LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(), 
0L));
+      return Optional.of(new 
LeaseAttemptStatus.LeasedToAnotherStatus(leaseAttempt.getConsensusDagAction(), 
((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt).getEventTimeMillis(), 
0L));
     } else {
       throw new RuntimeException("unexpected `LeaseAttemptStatus` derived 
type: '" + leaseAttempt.getClass().getName() + "' in '" + leaseAttempt + "'");
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
index f77427892..1087c6be4 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LeaseAttemptStatus.java
@@ -51,26 +51,23 @@ public abstract class LeaseAttemptStatus {
   public static class NoLongerLeasingStatus extends LeaseAttemptStatus {}
 
   /*
-  The participant calling this method acquired the lease for the event in 
question. `Dag action`'s flow execution id
-  is the timestamp associated with the lease and the time the caller obtained 
the lease is stored within the
-  `leaseAcquisitionTimestamp` field. The `multiActiveLeaseArbiter` reference 
is used to recordLeaseSuccess for the
-  current LeaseObtainedStatus via the completeLease method from a caller 
without access to the {@link MultiActiveLeaseArbiter}.
+  The participant calling this method acquired the lease for the event in 
question.
+  The timestamp associated with the lease is stored in `eventTimeMillis` field 
and the time the caller obtained the
+  lease is stored within the`leaseAcquisitionTimestamp` field. Note that the 
`Dag action` returned by the lease
+   arbitration attempt will be unchanged for flows that do not adopt the 
consensus eventTimeMillis as the flow execution
+   id, so a separate field must be maintained to track the eventTimeMillis for 
lease completion. The
+   `multiActiveLeaseArbiter` reference is used to recordLeaseSuccess for the 
current LeaseObtainedStatus via the
+   completeLease method from a caller without access to the {@link 
MultiActiveLeaseArbiter}.
   */
   @Data
   public static class LeaseObtainedStatus extends LeaseAttemptStatus {
     private final DagActionStore.DagAction consensusDagAction;
+    private final long eventTimeMillis;
     private final long leaseAcquisitionTimestamp;
     private final long minimumLingerDurationMillis;
     @Getter(AccessLevel.NONE)
     private final MultiActiveLeaseArbiter multiActiveLeaseArbiter;
 
-    /**
-     * @return event time in millis since epoch for the event of this lease 
acquisition
-     */
-    public long getEventTimeMillis() {
-      return Long.parseLong(consensusDagAction.getFlowExecutionId());
-    }
-
     /**
      * Completes the lease referenced by this status object if it has not 
expired.
      * @return true if able to complete lease, false otherwise.
@@ -83,22 +80,15 @@ public abstract class LeaseAttemptStatus {
 
   /*
   This dag action event already has a valid lease owned by another participant.
-  `Dag action`'s flow execution id is the timestamp the lease is associated 
with, however the dag action event it
-  corresponds to may be a different and distinct occurrence of the same event.
+  `eventTimeMillis' corresponds to the timestamp of the existing lease 
associated with this dag action, however the dag
+  action event it corresponds to may be a different and distinct occurrence of 
the same event.
   `minimumLingerDurationMillis` is the minimum amount of time to wait before 
this participant should return to check if
   the lease has completed or expired
    */
   @Data
   public static class LeasedToAnotherStatus extends LeaseAttemptStatus {
     private final DagActionStore.DagAction consensusDagAction;
+    private final long eventTimeMillis;
     private final long minimumLingerDurationMillis;
-
-    /**
-     * Returns event time in millis since epoch for the event whose lease was 
obtained by another participant.
-     * @return
-     */
-    public long getEventTimeMillis() {
-      return Long.parseLong(consensusDagAction.getFlowExecutionId());
-    }
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
index 4479358ec..2702a81d5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java
@@ -307,7 +307,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - 
CASE 2: Same event, lease is valid",
               updatedDagAction, isReminderEvent ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
           // Utilize db timestamp for reminder
-          return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
+          return new 
LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, 
dbEventTimestamp.getTime(),
               dbLeaseAcquisitionTimestamp.getTime() + dbLinger - 
dbCurrentTimestamp.getTime());
         }
         DagActionStore.DagAction updatedDagAction =
@@ -315,7 +315,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
         log.debug("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE 
3: Distinct event, lease is valid",
             updatedDagAction, isReminderEvent ? "reminder" : "original", 
dbCurrentTimestamp.getTime());
         // Utilize db lease acquisition timestamp for wait time
-        return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction,
+        return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, 
dbCurrentTimestamp.getTime(),
             dbLeaseAcquisitionTimestamp.getTime() + dbLinger  - 
dbCurrentTimestamp.getTime());
       } // Lease is invalid
       else if (leaseValidityStatus == 2) {
@@ -514,12 +514,14 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
     if (numRowsUpdated == 1) {
       log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}] 
successfully!", updatedDagAction,
           isReminderEvent ? "reminder" : "original", 
selectInfoResult.eventTimeMillis);
-      return new LeaseAttemptStatus.LeaseObtainedStatus(updatedDagAction, 
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), 
minimumLingerDurationMillis, this);
+      return new LeaseAttemptStatus.LeaseObtainedStatus(updatedDagAction, 
selectInfoResult.eventTimeMillis,
+          selectInfoResult.getLeaseAcquisitionTimeMillis().get(), 
minimumLingerDurationMillis, this);
     }
     log.info("Another participant acquired lease in between for [{}, is: {}, 
eventTimestamp: {}] - num rows updated: {}",
         updatedDagAction, isReminderEvent ? "reminder" : "original", 
selectInfoResult.eventTimeMillis, numRowsUpdated);
     // Another participant acquired lease in between
-    return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, 
minimumLingerDurationMillis);
+    return new LeaseAttemptStatus.LeasedToAnotherStatus(updatedDagAction, 
selectInfoResult.eventTimeMillis,
+        minimumLingerDurationMillis);
   }
 
   /**
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
index 607d44389..f8c0f8296 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java
@@ -110,8 +110,8 @@ public class DagManagementTaskStreamImplTest {
     when(dagManagementTaskStream.getDagActionProcessingLeaseArbiter()
         .tryAcquireLease(any(DagActionStore.DagAction.class), anyLong(), 
anyBoolean(), anyBoolean()))
         .thenReturn(new LeaseAttemptStatus.NoLongerLeasingStatus(),
-            new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 15),
-            new LeaseAttemptStatus.LeaseObtainedStatus(launchAction, 0, 5, 
null));
+            new LeaseAttemptStatus.LeasedToAnotherStatus(launchAction, 3, 15),
+            new LeaseAttemptStatus.LeaseObtainedStatus(launchAction, 5, 0, 5, 
null));
     DagTask dagTask = dagManagementTaskStream.next();
     Assert.assertTrue(dagTask instanceof LaunchDagTask);
     DagProc dagProc = dagTask.host(this.dagProcFactory);
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
index dea34b6cd..b4feeed84 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandlerTest.java
@@ -28,15 +28,16 @@ import 
org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 
 
 public class FlowLaunchHandlerTest {
-  long eventToRevisit = 123000L;
+  long flowExecutionId = 123000L;
+  long eventToRevisit = 641000L;
   long minimumLingerDurationMillis = 2000L;
   String cronExpression = 
FlowLaunchHandler.createCronFromDelayPeriod(minimumLingerDurationMillis);
   String cronExpressionSuffix = 
truncateFirstTwoFieldsOfCronExpression(cronExpression);
   int schedulerBackOffMillis = 10;
   DagActionStore.DagAction dagAction = new 
DagActionStore.DagAction("flowName", "flowGroup",
-      String.valueOf(eventToRevisit), "jobName", 
DagActionStore.DagActionType.LAUNCH);
+      String.valueOf(flowExecutionId), "jobName", 
DagActionStore.DagActionType.LAUNCH);
   LeaseAttemptStatus.LeasedToAnotherStatus leasedToAnotherStatus =
-      new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, 
minimumLingerDurationMillis);
+      new LeaseAttemptStatus.LeasedToAnotherStatus(dagAction, eventToRevisit, 
minimumLingerDurationMillis);
 
   /**
    * Remove first two fields from cron expression representing seconds and 
minutes to return truncated cron expression
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
index 2699148b6..607d8f3b0 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiterTest.java
@@ -239,7 +239,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
     DagActionStore.DagAction updatedResumeDagAction = 
resumeDagAction.updateFlowExecutionId(
         selectInfoResult.getEventTimeMillis());
     boolean markedSuccess = 
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new 
LeaseAttemptStatus.LeaseObtainedStatus(
-        updatedResumeDagAction, 
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
+        updatedResumeDagAction, selectInfoResult.getEventTimeMillis(),
+        selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
     Assert.assertTrue(markedSuccess);
     // Ensure no NPE results from calling this after a lease has been 
completed and acquisition timestamp val is NULL
     mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, 
resumeDagAction,
@@ -321,7 +322,8 @@ public class MysqlMultiActiveLeaseArbiterTest {
      DagActionStore.DagAction updatedResumeDagAction = 
resumeDagAction.updateFlowExecutionId(
          selectInfoResult.getEventTimeMillis());
      boolean markedSuccess = 
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new 
LeaseAttemptStatus.LeaseObtainedStatus(
-         updatedResumeDagAction, 
selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, null));
+         updatedResumeDagAction, selectInfoResult.getEventTimeMillis(),
+         selectInfoResult.getLeaseAcquisitionTimeMillis().get(), LINGER, 
null));
      Assert.assertTrue(markedSuccess);
 
      // Sleep enough time for the event to have been considered distinct
@@ -333,9 +335,10 @@ public class MysqlMultiActiveLeaseArbiterTest {
    }
 
    /*
-   Tests calling `tryAcquireLease` when `adoptConsensusFlowExecutionId` is set 
to True and verify that flowExecutionId
+   Tests calling `tryAcquireLease` when `adoptConsensusFlowExecutionId` is set 
to False and verify that flowExecutionId
    returned is the same as flowExecutionId provided to it for a 
LeaseObtainedStatus and LeasedToAnotherStatus object
-   (CASE 1 & 2).
+   (CASE 1 & 2). It also verifies that the `eventTimeMillis` stored in a lease 
obtained status can be used to complete
+   the lease.
    */
   @Test
   public void testSkipAdoptingConsensusFlowExecutionId() throws IOException {
@@ -346,18 +349,21 @@ public class MysqlMultiActiveLeaseArbiterTest {
     LeaseAttemptStatus.LeaseObtainedStatus firstObtainedStatus =
         (LeaseAttemptStatus.LeaseObtainedStatus) firstLaunchStatus;
     Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() <= 
firstObtainedStatus.getLeaseAcquisitionTimestamp());
+    Assert.assertTrue(firstObtainedStatus.getEventTimeMillis() != 
Long.valueOf(firstObtainedStatus.getConsensusDagAction().getFlowExecutionId()));
     Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
         new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH)));
 
     // A second attempt to obtain a lease on the same action should return a 
LeasedToAnotherStatus which also contains
-    // the original flowExecutionId
+    // the original flowExecutionId and the same event time from the previous 
LeaseAttemptStatus
     LeaseAttemptStatus secondLaunchStatus =
         mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction2, 
eventTimeMillis, false, false);
     Assert.assertTrue(secondLaunchStatus instanceof 
LeaseAttemptStatus.LeasedToAnotherStatus);
     LeaseAttemptStatus.LeasedToAnotherStatus secondLeasedToAnotherStatus =
         (LeaseAttemptStatus.LeasedToAnotherStatus) secondLaunchStatus;
     Assert.assertEquals(firstObtainedStatus.getEventTimeMillis(), 
secondLeasedToAnotherStatus.getEventTimeMillis());
-    Assert.assertTrue(firstObtainedStatus.getConsensusDagAction().equals(
+    
Assert.assertTrue(secondLeasedToAnotherStatus.getConsensusDagAction().equals(
         new DagActionStore.DagAction(flowGroup2, flowName, flowExecutionId, 
jobName, DagActionStore.DagActionType.LAUNCH)));
+
+    
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(firstObtainedStatus));
   }
 }

Reply via email to