[
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=863084&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-863084
]
ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 31/May/23 15:57
Start Date: 31/May/23 15:57
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1211941503
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.SchedulerLeaseDeterminationStore;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class SchedulerLeaseAlgoHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
+ private final long linger;
+ private final int staggerUpperBoundSec;
+ private static Random random = new Random();
+ protected SchedulerLeaseDeterminationStore leaseDeterminationStore;
+ protected JobScheduler jobScheduler;
+ protected SchedulerService schedulerService;
+ @Inject
+ public SchedulerLeaseAlgoHandler(Config config,
SchedulerLeaseDeterminationStore leaseDeterminationStore,
+ JobScheduler jobScheduler, SchedulerService schedulerService)
+ throws IOException {
+ this.linger = ConfigUtils.getLong(config,
ConfigurationKeys.SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_TRIGGER_EVENT_EPSILON_MILLIS);
+ this.staggerUpperBoundSec = ConfigUtils.getInt(config,
+ ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+ this.leaseDeterminationStore = leaseDeterminationStore;
+ this.jobScheduler = jobScheduler;
+ this.schedulerService = schedulerService;
+ }
+ private SchedulerLeaseDeterminationStore schedulerLeaseDeterminationStore;
+
+ /**
+ * This method is used in the multi-active scheduler case for one or more
hosts to respond to a flow's trigger event
+ * by attempting a lease for the flow event.
+ * @param jobProps
+ * @param flowGroup
+ * @param flowName
+ * @param flowExecutionId
+ * @param flowActionType
+ * @param triggerTimeMillis
+ * @return true if this host obtained the lease for this flow's trigger
event, false otherwise.
+ * @throws IOException
+ */
+ public boolean handleNewTriggerEvent(Properties jobProps, String flowGroup,
String flowName, String flowExecutionId,
+ SchedulerLeaseDeterminationStore.FlowActionType flowActionType, long
triggerTimeMillis)
+ throws IOException {
+ SchedulerLeaseDeterminationStore.LeaseAttemptStatus leaseAttemptStatus =
+
schedulerLeaseDeterminationStore.attemptInsertAndGetPursuantTimestamp(flowGroup,
flowName, flowExecutionId,
+ flowActionType, triggerTimeMillis);
+ // TODO: add a log event or metric for each of these cases
+ switch (leaseAttemptStatus) {
+ case LEASE_OBTAINED:
+ return true;
+ case PREVIOUS_LEASE_EXPIRED:
+ // recursively try obtaining lease again immediately, stops when
reaches one of the other cases
+ return handleNewTriggerEvent(jobProps, flowGroup, flowName,
flowExecutionId, flowActionType, triggerTimeMillis);
+ case PREVIOUS_LEASE_VALID:
+ scheduleReminderForTriggerEvent(jobProps, flowGroup, flowName,
flowExecutionId, flowActionType, triggerTimeMillis);
+ }
+ return false;
+ }
+
+ /**
+ * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewTriggerEvent} to schedule a reminder for
itself to
+ * check on the other participant's progress during pursuing orchestration
after the time the lease should expire.
+ * If the previous participant was successful, then no further action is
taken otherwise we re-attempt pursuing
+ * orchestration ourselves.
+ * @param flowGroup
+ * @param flowName
+ * @param flowExecutionId
+ * @param flowActionType
+ * @param triggerTimeMillis
+ */
+ protected void scheduleReminderForTriggerEvent(Properties jobProps, String
flowGroup, String flowName, String flowExecutionId,
+ SchedulerLeaseDeterminationStore.FlowActionType flowActionType, long
triggerTimeMillis) {
+ // Check-in `linger` time after the current timestamp which is
"close-enough" to the time the pursuant attempted
+ // the flow action. We also add a small randomization to avoid 'thundering
herd' issue
+ String cronExpression = createCronFromDelayPeriod(linger +
random.nextInt(staggerUpperBoundSec));
+ jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
+ // This timestamp is what will be used to identify the particular flow
trigger event it's associated with
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_ORIGINAL_TRIGGER_TIMESTAMP_MILLIS_KEY,
String.valueOf(triggerTimeMillis));
+ JobKey key = new JobKey(flowName, flowGroup);
+ Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
+ try {
+ LOG.info("Attempting to add job reminder to Scheduler Service where job
is %s trigger event %s and reminder is at "
+ + "%s.", key, triggerTimeMillis, trigger.getNextFireTime());
+ this.schedulerService.getScheduler().scheduleJob(trigger);
+ } catch (SchedulerException e) {
+ LOG.warn("Failed to add job reminder due to SchedulerException for job
%s trigger event %s ", key, triggerTimeMillis, e);
+ }
+ LOG.info(String.format("Scheduled reminder for job %s trigger event %s.
Next run: %s.", key, triggerTimeMillis, trigger.getNextFireTime()));
+ }
+
+ /**
+ * These methods should only be called from the Orchestrator or JobScheduler
classes as it directly adds jobs to the
+ * Quartz scheduler
+ * @param delayPeriodSeconds
+ * @return
+ */
+ protected static String createCronFromDelayPeriod(long delayPeriodSeconds) {
+ LocalDateTime now = LocalDateTime.now();
+ LocalDateTime delaySecondsLater = now.plus(delayPeriodSeconds,
ChronoUnit.SECONDS);
+ // TODO: investigate potentially better way of generating cron expression
that does not make it US dependent
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("ss mm HH dd MM
? yyyy", Locale.US);
Review Comment:
Does LocalDateTime.now() default to US timezone? Otherwise this could cause
issues. I would suggest you use `LocalDateTime.now(<timezone>)` to ensure
consistency in this system across timezones.
Also, I think GaaS scheduler defaults to UTC.
Issue Time Tracking
-------------------
Worklog Id: (was: 863084)
Time Spent: 5h 50m (was: 5h 40m)
> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
> Key: GOBBLIN-1837
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active
> scheduler for each host. It will NOT include metric emission or unit tests
> for validation. That will be done in a separate follow-up ticket. The work in
> this ticket includes
> * define a table to do scheduler lease determination for each flow's trigger
> event and related methods to execute actions on this tableĀ
> * update DagActionStore schema and DagActionStoreMonitor to act upon new
> "LAUNCH" type events in addition to KILL/RESUME
> * update scheduler/orchestrator logic to apply the non-blocking algorithm
> when "multi-active scheduler mode" is enabled, otherwise submit events
> directly to the DagManager after receiving a scheduler trigger
> * implement the non-blocking algorithm, particularly handling reminder
> events if another host is in the process of securing the lease for a
> particular flow trigger
--
This message was sent by Atlassian Jira
(v8.20.10#820010)