[ 
https://issues.apache.org/jira/browse/GOBBLIN-1837?focusedWorklogId=864383&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-864383
 ]

ASF GitHub Bot logged work on GOBBLIN-1837:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Jun/23 08:54
            Start Date: 08/Jun/23 08:54
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3700:
URL: https://github.com/apache/gobblin/pull/3700#discussion_r1222657174


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/SchedulerLeaseAlgoHandler.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Random;
+
+import org.quartz.JobKey;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.LeaseAttemptStatus;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
+import org.apache.gobblin.scheduler.JobScheduler;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.runtime.api.LeaseObtainedStatus;
+import org.apache.gobblin.runtime.api.LeasedToAnotherStatus;
+
+
+/**
+ * Handler used to coordinate multiple hosts with enabled schedulers to 
respond to flow action events. It uses the
+ * {@link org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter} to 
determine a single lease owner at a given time
+ * for a flow action event. After acquiring the lease, it persists the flow 
action event to the {@link DagActionStore}
+ * to be eventually acted upon by the host with the active DagManager. Once it 
has completed this action, it will mark
+ * the lease as completed by calling the
+ * {@link 
org.apache.gobblin.runtime.api.MySQLMultiActiveLeaseArbiter.completeLeaseUse} 
method. Hosts that do not gain
+ * the lease for the event, instead schedule a reminder using the {@link 
SchedulerService} to check back in on the
+ * previous lease owner's completion status after the lease should expire to 
ensure the event is handled in failure
+ * cases.
+ */
+public class SchedulerLeaseAlgoHandler {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SchedulerLeaseAlgoHandler.class);
+  private final int staggerUpperBoundSec;
+  private static Random random = new Random();
+  protected MultiActiveLeaseArbiter multiActiveLeaseArbiter;
+  protected JobScheduler jobScheduler;
+  protected SchedulerService schedulerService;
+  protected DagActionStore dagActionStore;
+  private MetricContext metricContext;
+  private ContextAwareMeter numLeasesCompleted;
+  @Inject
+  public SchedulerLeaseAlgoHandler(Config config, MultiActiveLeaseArbiter 
leaseDeterminationStore,
+      JobScheduler jobScheduler, SchedulerService schedulerService, 
DagActionStore dagActionStore) {
+    this.staggerUpperBoundSec = ConfigUtils.getInt(config,
+        ConfigurationKeys.SCHEDULER_STAGGERING_UPPER_BOUND_SEC_KEY,
+        ConfigurationKeys.DEFAULT_SCHEDULER_STAGGERING_UPPER_BOUND_SEC);
+    this.multiActiveLeaseArbiter = leaseDeterminationStore;
+    this.jobScheduler = jobScheduler;
+    this.schedulerService = schedulerService;
+    this.dagActionStore = dagActionStore;
+    this.metricContext = Instrumented.getMetricContext(new 
org.apache.gobblin.configuration.State(ConfigUtils.configToProperties(config)),
+        this.getClass());
+    this.numLeasesCompleted = 
metricContext.contextAwareMeter(RuntimeMetrics.GOBBLIN_SCHEDULER_LEASE_ALGO_HANDLER_NUM_LEASES_COMPLETED);
+  }
+
+  /**
+   * This method is used in the multi-active scheduler case for one or more 
hosts to respond to a flow action event
+   * by attempting a lease for the flow event and processing the result 
depending on the status of the attempt.
+   * @param jobProps
+   * @param flowAction
+   * @param eventTimeMillis
+   * @throws IOException
+   */
+  public void handleNewSchedulerEvent(Properties jobProps, 
DagActionStore.DagAction flowAction, long eventTimeMillis)
+      throws IOException {
+    LeaseAttemptStatus leaseAttemptStatus =
+        multiActiveLeaseArbiter.tryAcquireLease(flowAction, eventTimeMillis);
+    // TODO: add a log event or metric for each of these cases
+    switch (leaseAttemptStatus.getClass().getSimpleName()) {
+      case "LeaseObtainedStatus":
+        finalizeLease((LeaseObtainedStatus) leaseAttemptStatus, flowAction);
+        break;
+      case "LeasedToAnotherStatus":
+        scheduleReminderForEvent(jobProps, (LeasedToAnotherStatus) 
leaseAttemptStatus, flowAction, eventTimeMillis);
+        break;
+      case "NoLongerLeasingStatus":
+        break;
+      default:
+    }
+  }
+
+  // Called after obtaining a lease to persist the flow action to {@link 
DagActionStore} and mark the lease as done
+  private boolean finalizeLease(LeaseObtainedStatus status, 
DagActionStore.DagAction flowAction) {
+    try {
+      this.dagActionStore.addDagAction(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
+          flowAction.getFlowExecutionId(), flowAction.getFlowActionType());
+      if (this.dagActionStore.exists(flowAction.getFlowGroup(), 
flowAction.getFlowName(),
+          flowAction.getFlowExecutionId(), flowAction.getFlowActionType())) {
+        // If the flow action has been persisted to the {@link DagActionStore} 
we can close the lease
+        this.numLeasesCompleted.mark();
+        return this.multiActiveLeaseArbiter.completeLeaseUse(flowAction, 
status.getEventTimestamp(),
+            status.getMyLeaseAcquisitionTimestamp());
+      }
+    } catch (IOException | SQLException e) {
+      throw new RuntimeException(e);
+    }
+    // TODO: should this return an error or print a warning log if failed to 
commit to dag action store?
+    return false;
+  }
+
+  /**
+   * This method is used by {@link 
SchedulerLeaseAlgoHandler.handleNewSchedulerEvent} to schedule a reminder for 
itself
+   * to check on the other participant's progress to finish acting on a flow 
action after the time the lease should
+   * expire.
+   * @param jobProps
+   * @param status used to extract event to be reminded for and the minimum 
time after which reminder should occur
+   * @param originalEventTimeMillis the event timestamp we were originally 
handling
+   * @param flowAction
+   */
+  private void scheduleReminderForEvent(Properties jobProps, 
LeasedToAnotherStatus status,
+      DagActionStore.DagAction flowAction, long originalEventTimeMillis) {
+    // Add a small randomization to the minimum reminder wait time to avoid 
'thundering herd' issue
+    String cronExpression = 
createCronFromDelayPeriod(status.getMinimumReminderWaitMillis() + 
random.nextInt(staggerUpperBoundSec));
+    jobProps.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, cronExpression);
+    // Ensure we save the event timestamp that we're setting reminder for, in 
addition to our own event timestamp which may be different
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_REMINDER_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
+    
jobProps.setProperty(ConfigurationKeys.SCHEDULER_NEW_EVENT_TIMESTAMP_MILLIS_KEY,
 String.valueOf(status.getReminderEventTimeMillis()));
+    JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
+    Trigger trigger = this.jobScheduler.getTrigger(key, jobProps);

Review Comment:
   sorry, I guess I'm unfamiliar: what's the meaning/nature of this `Trigger` 
we get from one scheduler and give to another?
   
   (I'd love to avoid bringing in the dependency on `JobScheduler` if we can 
avoid it, and instead have this class depend only on the scheduler.)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 864383)
    Time Spent: 10h 20m  (was: 10h 10m)

> Implement multi-active, non blocking for leader host
> ----------------------------------------------------
>
>                 Key: GOBBLIN-1837
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1837
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 10h 20m
>  Remaining Estimate: 0h
>
> This task will include the implementation of non-blocking, multi-active 
> scheduler for each host. It will NOT include metric emission or unit tests 
> for validation. That will be done in a separate follow-up ticket. The work in 
> this ticket includes
>  * define a table to do scheduler lease determination for each flow's trigger 
> event and related methods to execute actions on this tableĀ 
>  * update DagActionStore schema and DagActionStoreMonitor to act upon new 
> "LAUNCH" type events in addition to KILL/RESUME
>  * update scheduler/orchestrator logic to apply the non-blocking algorithm 
> when "multi-active scheduler mode" is enabled, otherwise submit events 
> directly to the DagManager after receiving a scheduler trigger
>  * implement the non-blocking algorithm, particularly handling reminder 
> events if another host is in the process of securing the lease for a 
> particular flow trigger



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to