[
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=864383&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-864383
]
ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 08/Jun/23 08:54
Start Date: 08/Jun/23 08:54
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1222657174
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
+import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to
respond to flow action events. It uses the
+ * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to
determine a single lease owner at a given time
+ * for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse}
method. Hosts that do not gain
+ * the lease for the event, instead schedule a reminder using the {@link
SchedulerService} to check back in on the
+ * previous lease owner's completion status after the lease should expire to
ensure the event is handled in failure
+ * cases.
+ */
+public class SchedulerLeaseAlgoHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
+ private final int staggerUpperBoundSec;
+ private static Random random = new Random();
+ protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+ protected JobScheduler jobScheduler;
+ protected SchedulerService schedulerService;
+ protected DagActionStore dagActionStore;
+ private MetricContext metricContext;
+ private ContextAwareMeter numLeasesCompleted;
+ @Inject
+ public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter
leaseDeterminationStore,
+ JobScheduler jobScheduler, SchedulerService schedulerService,
DagActionStore dagActionStore) {
+ this.staggerUpperBoundSec = ConfigUtils.getInt(config,
+ ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
+ ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+ this.multiActiveLeaseArbiter = leaseDeterminationStore;
+ this.jobScheduler = jobScheduler;
+ this.schedulerService = schedulerService;
+ this.dagActionStore = dagActionStore;
+ this.metricContext = Instrumented.getMetricContext(new
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+ this.getClass());
+ this.numLeasesCompleted =
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED);
+ }
+
+ /**
+ * This method is used in the multi-active scheduler case for one or more
hosts to respond to a flow action event
+ * by attempting a lease for the flow event and processing the result
depending on the status of the attempt.
+ * @param jobProps
+ * @param flowAction
+ * @param eventTimeMillis
+ * @throws IOException
+ */
+ public void handleNewSchedulerEvent(Properties jobProps,
DagActionStore.DagAction flowAction, long eventTimeMillis)
+ throws IOException {
+ LeaseAttemptStatus leaseAttemptStatus =
+ multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
+ // TODO: add a log event or metric for each of these cases
+ switch (leaseAttemptStatus.getClass().getSimpleName()) {
+ case "LeaseObtainedStatus":
+ finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
+ break;
+ case "LeasedToAnotherStatus":
+ scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus)
leaseAttemptStatus, flowAction, eventTimeMillis);
+ break;
+ case "NoLongerLeasingStatus":
+ break;
+ default:
+ }
+ }
+
+ // Called after obtaining a lease to persist the flow action to {@link
DagActionStore} and mark the lease as done
+ private boolean finalizeLease(LeaseObtainedStatus status,
DagActionStore.DagAction flowAction) {
+ try {
+ this.dagActionStore.addDagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
+ flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
+ if (this.dagActionStore.exists(flowAction.getFlowGroup(),
flowAction.getFlowName(),
+ flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
+ // If the flow action has been persisted to the {@link DagActionStore}
we can close the lease
+ this.numLeasesCompleted.mark();
+ return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction,
status.getEventTimestamp(),
+ status.getMyLeaseAcquisitionTimestamp());
+ }
+ } catch (IOException | SQLException e) {
+ throw new RuntimeException(e);
+ }
+ // TODO: should this return an error or print a warning log if failed to
commit to dag action store?
+ return false;
+ }
+
+ /**
+ * This method is used by {@link
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for
itself
+ * to check on the other participant's progress to finish acting on a flow
action after the time the lease should
+ * expire.
+ * @param jobProps
+ * @param status used to extract event to be reminded for and the minimum
time after which reminder should occur
+ * @param originalEventTimeMillis the event timestamp we were originally
handling
+ * @param flowAction
+ */
+ private void scheduleReminderForEvent(Properties jobProps,
LeasedToAnotherStatus status,
+ DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
+ // Add a small randomization to the minimum reminder wait time to avoid
'thundering herd' issue
+ String cronExpression =
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() +
random.nextInt(staggerUpperBoundSec));
+ jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
+ // Ensure we save the event timestamp that we're setting reminder for, in
addition to our own event timestamp which may be different
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
String.valueOf(status.getReminderEventTimeMillis()));
+ JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
+ Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);
Review Comment:
sorry, I guess I'm unfamiliar: what's the meaning/nature of this `Trigger`
we get from one scheduler and give to another?
(I'd love to avoid bringing in the dependency on `JobScheduler` if we can
avoid it, and instead have this class depend only on the scheduler.)
Issue Time Tracking
-------------------
Worklog Id: (was: 864383)
Time Spent: 10h 20m (was: 10h 10m)
> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
> Key: GOBBLIN-1837
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
> Project: Apache Gobblin
> Issue Type: Bug
> Components: gobblin-service
> Reporter: Urmi Mustafi
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 10h 20m
> Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active
> scheduler for each host. It will NOT include metric emission or unit tests
> for validation. That will be done in a separate follow-up ticket. The work in
> this ticket includes
> * define a table to do scheduler lease determination for each flow's trigger
> event and related methods to execute actions on this tableĀ
> * update DagActionStore schema and DagActionStoreMonitor to act upon new
> "LAUNCH" type events in addition to KILL/RESUME
> * update scheduler/orchestrator logic to apply the non-blocking algorithm
> when "multi-active scheduler mode" is enabled, otherwise submit events
> directly to the DagManager after receiving a scheduler trigger
> * implement the non-blocking algorithm, particularly handling reminder
> events if another host is in the process of securing the lease for a
> particular flow trigger
--
This message was sent by Atlassian Jira
(v8.20.10#820010)