phet commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1343147474


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -311,4 +315,40 @@ protected static long getUTCTimeFromDelayPeriod(long 
delayPeriodMillis) {
     Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
     return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date);
   }
+
+  /**
+   * Attempts to acquire lease for a given {@link 
org.apache.gobblin.runtime.api.DagActionStore.DagAction}
+   * through lease arbitration and if it fails, it will create and schedule a 
reminder trigger to check back again.
+   * @param jobProps
+   * @param flowAction
+   * @param eventTimeMillis
+   * @return optionally leaseObtainedStatus if acquired; otherwise schedule 
reminder to check back again.
+   * @throws IOException
+   */
+  public MultiActiveLeaseArbiter.LeaseAttemptStatus 
getLeaseOnDagAction(Properties jobProps, DagActionStore.DagAction flowAction, 
long eventTimeMillis) throws IOException {
+
+    if (multiActiveLeaseArbiter.isPresent()) {
+      MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = 
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
+      if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+        this.leaseObtainedCount.inc();
+        log.info("Successfully acquired lease for dag action: {}", flowAction);
+      } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+        this.leasedToAnotherStatusCount.inc();
+        scheduleReminderForEvent(jobProps,
+            (MultiActiveLeaseArbiter.LeasedToAnotherStatus) 
leaseAttemptStatus, eventTimeMillis);
+      } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+        this.noLongerLeasingStatusCount.inc();
+        log.info("Received type of leaseAttemptStatus: [{}, eventTimestamp: 
{}] ", leaseAttemptStatus.getClass().getName(),
+            eventTimeMillis);
+      }
+      return leaseAttemptStatus;
+    } else {
+      throw new RuntimeException(String.format("Multi-active scheduler is not 
enabled so trigger event should not be "
+          + "handled with this method."));
+    }
+  }
+
+  public MultiActiveLeaseArbiter getMultiActiveLeaseArbiter() {
+    return this.multiActiveLeaseArbiter.get();
+  }

Review Comment:
   no need for this, if you indeed modify the existing `handleTriggerEvent` on 
line 109 for reuse



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to