[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=882995&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-882995
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Oct/23 21:08
            Start Date: 02/Oct/23 21:08
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1343147474


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -311,4 +315,40 @@ protected static long getUTCTimeFromDelayPeriod(long 
delayPeriodMillis) {
     Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
     return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date);
   }
+
+  /**
+   * Attempts to acquire lease for a given {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}
+   * through lease arbitration and if it fails, it will create and schedule a 
reminder trigger to check back again.
+   * @param jobProps
+   * @param flowAction
+   * @param eventTimeMillis
+   * @return optionally leaseObtainedStatus if acquired; otherwise schedule 
reminder to check back again.
+   * @throws IOException
+   */
+  public MultiActiveLeaseArbiter.LeaseAttemptStatus 
getLeaseOnDagAction(Properties jobProps, DagActionStore.DagAction flowAction, 
long eventTimeMillis) throws IOException {
+
+    if (multiActiveLeaseArbiter.isPresent()) {
+      MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
+      if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+        this.leaseObtainedCount.inc();
+        log.info("Successfully acquired lease for dag action: {}", flowAction);
+      } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+        this.leasedToAnotherStatusCount.inc();
+        scheduleReminderForEvent(jobProps,
+            (MultiActiveLeaseArbiter.LeasedToAnotherStatus) 
leaseAttemptStatus, eventTimeMillis);
+      } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+        this.noLongerLeasingStatusCount.inc();
+        log.info("Received type of leaseAttemptStatus: [{}, eventTimestamp: 
{}] ", leaseAttemptStatus.getClass().getName(),
+            eventTimeMillis);
+      }
+      return leaseAttemptStatus;
+    } else {
+      throw new RuntimeException(String.format("Multi-active scheduler is not 
enabled so trigger event should not be "
+          + "handled with this method."));
+    }
+  }
+
+  public MultiActiveLeaseArbiter getMultiActiveLeaseArbiter() {
+    return this.multiActiveLeaseArbiter.get();
+  }

Review Comment:
   no need for this, if you indeed modify the existing `handleTriggerEvent` on 
line 109 for reuse





Issue Time Tracking
-------------------

    Worklog Id:     (was: 882995)
    Time Spent: 12h 50m  (was: 12h 40m)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 12h 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to