phet commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1343147474
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java:
##########
@@ -311,4 +315,40 @@ protected static long getUTCTimeFromDelayPeriod(long
delayPeriodMillis) {
Date date = Date.from(localDateTime.atZone(ZoneId.of("UTC")).toInstant());
return GobblinServiceJobScheduler.utcDateAsUTCEpochMillis(date);
}
+
+ /**
+ * Attempts to acquire lease for a given {@link
org.apache.gobblin.runtime.api.DagActionStore.DagAction}
+ * through lease arbitration and if it fails, it will create and schedule a
reminder trigger to check back again.
+ * @param jobProps
+ * @param flowAction
+ * @param eventTimeMillis
+ * @return optionally leaseObtainedStatus if acquired; otherwise schedule
reminder to check back again.
+ * @throws IOException
+ */
+ public MultiActiveLeaseArbiter.LeaseAttemptStatus
getLeaseOnDagAction(Properties jobProps, DagActionStore.DagAction flowAction,
long eventTimeMillis) throws IOException {
+
+ if (multiActiveLeaseArbiter.isPresent()) {
+ MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
+ if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
+ this.leaseObtainedCount.inc();
+ log.info("Successfully acquired lease for dag action: {}", flowAction);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
+ this.leasedToAnotherStatusCount.inc();
+ scheduleReminderForEvent(jobProps,
+ (MultiActiveLeaseArbiter.LeasedToAnotherStatus)
leaseAttemptStatus, eventTimeMillis);
+ } else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
+ this.noLongerLeasingStatusCount.inc();
+ log.info("Received type of leaseAttemptStatus: [{}, eventTimestamp:
{}] ", leaseAttemptStatus.getClass().getName(),
+ eventTimeMillis);
+ }
+ return leaseAttemptStatus;
+ } else {
+ throw new RuntimeException(String.format("Multi-active scheduler is not
enabled so trigger event should not be "
+ + "handled with this method."));
+ }
+ }
+
+ public MultiActiveLeaseArbiter getMultiActiveLeaseArbiter() {
+ return this.multiActiveLeaseArbiter.get();
+ }
Review Comment:
no need for this, if you indeed modify the existing `handleTriggerEvent` on
line 109 for reuse
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]