This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5af6bca57 [GOBBLIN-1846] Validate Multi-active Scheduler with Logs 
(#3707)
5af6bca57 is described below

commit 5af6bca57df909e44b995e5b2d667c70e0399877
Author: umustafi <[email protected]>
AuthorDate: Thu Jun 22 14:04:05 2023 -0700

    [GOBBLIN-1846] Validate Multi-active Scheduler with Logs (#3707)
    
    * [GOBBLIN-1846] Validate Multi-active Scheduler with Logs
    
    * Adds logging at 3 critical points to ensure no scheduled events are missed
            - when jobs are first rescheduled (or updated)
            - each trigger of a new job
            - when jobs are unscheduled
    
    * move flow related logic to GobblinServiceJobScheduler
    
    * rename and reuse util
    
    * fix typo to pass tests
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../gobblin/configuration/ConfigurationKeys.java   |  4 +--
 .../org/apache/gobblin/scheduler/JobScheduler.java | 13 +++-----
 .../scheduler/GobblinServiceJobScheduler.java      | 37 ++++++++++++++++++++--
 3 files changed, 42 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b155e8089..2d60fd5c8 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -98,9 +98,9 @@ public class ConfigurationKeys {
   // Scheduler lease determination store configuration
   public static final String MYSQL_LEASE_ARBITER_PREFIX = 
"MysqlMultiActiveLeaseArbiter";
   public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
-  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= MYSQL_LEASE_ARBITER_PREFIX + 
".gobblin_multi_active_scheduler_constants_store";
+  public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE 
= "gobblin_multi_active_scheduler_constants_store";
   public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY 
= MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiterTable";
-  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
MYSQL_LEASE_ARBITER_PREFIX + ".gobblin_scheduler_lease_determination_store";
+  public static final String 
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE = 
"gobblin_scheduler_lease_determination_store";
   public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY = 
"eventToRevisitTimestampMillis";
   public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY = 
"triggerEventTimestampMillis";
   public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = 
MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index 56b1ac8c0..c49de3c6a 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -398,7 +398,7 @@ public class JobScheduler extends AbstractIdleService {
       // Schedule the Quartz job with a trigger built from the job 
configuration
       Trigger trigger = createTriggerForJob(job.getKey(), jobProps);
       this.scheduler.getScheduler().scheduleJob(job, trigger);
-      LOG.info(String.format("Scheduled job %s. Next run: %s.", job.getKey(), 
trigger.getNextFireTime()));
+      logNewlyScheduledJob(job, trigger);
     } catch (SchedulerException se) {
       LOG.error("Failed to schedule job " + jobName, se);
       throw new JobException("Failed to schedule job " + jobName, se);
@@ -407,6 +407,10 @@ public class JobScheduler extends AbstractIdleService {
     this.scheduledJobs.put(jobName, job.getKey());
   }
 
+  protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) {
+    LOG.info(String.format("Scheduled job %s. Next run: %s.", job.getKey(), 
trigger.getNextFireTime()));
+  }
+
   /**
    * Unschedule and delete a job.
    *
@@ -606,13 +610,6 @@ public class JobScheduler extends AbstractIdleService {
       JobScheduler jobScheduler = (JobScheduler) 
dataMap.get(JOB_SCHEDULER_KEY);
       Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
       JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
-      // Obtain trigger timestamp from trigger to pass to jobProps
-      Trigger trigger = context.getTrigger();
-      // THIS current event has already fired if this method is called, so it 
now exists in <previousFireTime>
-      long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
-      
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
-          String.valueOf(triggerTimestampMillis));
-
       try {
         jobScheduler.runJob(jobProps, jobListener);
       } catch (Throwable t) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index b62a869ba..5e0ac3988 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -30,14 +30,17 @@ import java.util.Properties;
 import java.util.TimeZone;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.helix.HelixManager;
 import org.quartz.CronExpression;
 import org.quartz.DisallowConcurrentExecution;
 import org.quartz.InterruptableJob;
 import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
 import org.quartz.SchedulerException;
+import org.quartz.Trigger;
 import org.quartz.UnableToInterruptJobException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,6 +99,7 @@ import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFI
  */
 @Alpha
 @Singleton
+@Slf4j
 public class GobblinServiceJobScheduler extends JobScheduler implements 
SpecCatalogListener {
 
   // Scheduler related configuration
@@ -442,6 +446,19 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
     }
   }
 
+  @Override
+  protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) {
+    Properties jobProps = (Properties) job.getJobDataMap().get(PROPERTIES_KEY);
+    log.info(jobSchedulerTracePrefixBuilder(jobProps) + "nextTriggerTime: {} - 
Job newly scheduled",
+         trigger.getNextFireTime());
+  }
+
+  protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) {
+    return String.format("Scheduler trigger tracing: [flowName: %s flowGroup: 
%s] - ",
+        jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY, "<<no flow 
name>>"),
+        jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "<<no flow 
group>>"));
+  }
+
   @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
@@ -576,6 +593,13 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       this.scheduledFlowSpecs.remove(specURI.toString());
       this.lastUpdatedTimeForFlowSpec.remove(specURI.toString());
       unscheduleJob(specURI.toString());
+      try {
+          FlowSpec spec = (FlowSpec) this.flowCatalog.get().getSpecs(specURI);
+          Properties properties = spec.getConfigAsProperties();
+          _log.info(jobSchedulerTracePrefixBuilder(properties) + "Unscheduled 
Spec");
+        } catch (SpecNotFoundException e) {
+          _log.warn("Unable to retrieve spec for URI {}", specURI);
+        }
     } else {
       throw new JobException(String.format(
           "Spec with URI: %s was not found in cache. May be it was cleaned, if 
not please clean it manually",
@@ -666,13 +690,22 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
 
     @Override
     public void executeImpl(JobExecutionContext context) throws 
JobExecutionException {
-      _log.info("Starting FlowSpec " + context.getJobDetail().getKey());
+      JobDetail jobDetail = context.getJobDetail();
+      _log.info("Starting FlowSpec " + jobDetail.getKey());
 
-      JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+      JobDataMap dataMap = jobDetail.getJobDataMap();
       JobScheduler jobScheduler = (JobScheduler) 
dataMap.get(JOB_SCHEDULER_KEY);
       Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
       JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
 
+      // Obtain trigger timestamp from trigger to pass to jobProps
+      Trigger trigger = context.getTrigger();
+      // THIS current event has already fired if this method is called, so it 
now exists in <previousFireTime>
+      long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
+      
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+          String.valueOf(triggerTimestampMillis));
+      _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} 
nextTriggerTime: {} - Job triggered by "
+              + "scheduler", triggerTimestampMillis, 
trigger.getNextFireTime().getTime());
       try {
         jobScheduler.runJob(jobProps, jobListener);
       } catch (Throwable t) {

Reply via email to