This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5af6bca57 [GOBBLIN-1846] Validate Multi-active Scheduler with Logs
(#3707)
5af6bca57 is described below
commit 5af6bca57df909e44b995e5b2d667c70e0399877
Author: umustafi <[email protected]>
AuthorDate: Thu Jun 22 14:04:05 2023 -0700
[GOBBLIN-1846] Validate Multi-active Scheduler with Logs (#3707)
* [GOBBLIN-1846] Validate Multi-active Scheduler with Logs
* Adds logging at 3 critical points to ensure no scheduled events are missed
- when jobs are first rescheduled (or updated)
- each trigger of a new job
- when jobs are unscheduled
* move flow related logic to GobblinServiceJobScheduler
* rename and reuse util
* fix typo to pass tests
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../gobblin/configuration/ConfigurationKeys.java | 4 +--
.../org/apache/gobblin/scheduler/JobScheduler.java | 13 +++-----
.../scheduler/GobblinServiceJobScheduler.java | 37 ++++++++++++++++++++--
3 files changed, 42 insertions(+), 12 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b155e8089..2d60fd5c8 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -98,9 +98,9 @@ public class ConfigurationKeys {
// Scheduler lease determination store configuration
public static final String MYSQL_LEASE_ARBITER_PREFIX =
"MysqlMultiActiveLeaseArbiter";
public static final String MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".constantsTable";
- public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= MYSQL_LEASE_ARBITER_PREFIX +
".gobblin_multi_active_scheduler_constants_store";
+ public static final String DEFAULT_MULTI_ACTIVE_SCHEDULER_CONSTANTS_DB_TABLE
= "gobblin_multi_active_scheduler_constants_store";
public static final String SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE_KEY
= MYSQL_LEASE_ARBITER_PREFIX + ".schedulerLeaseArbiterTable";
- public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
MYSQL_LEASE_ARBITER_PREFIX + ".gobblin_scheduler_lease_determination_store";
+ public static final String
DEFAULT_SCHEDULER_LEASE_DETERMINATION_STORE_DB_TABLE =
"gobblin_scheduler_lease_determination_store";
public static final String SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY =
"eventToRevisitTimestampMillis";
public static final String SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY =
"triggerEventTimestampMillis";
public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY =
MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index 56b1ac8c0..c49de3c6a 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -398,7 +398,7 @@ public class JobScheduler extends AbstractIdleService {
// Schedule the Quartz job with a trigger built from the job
configuration
Trigger trigger = createTriggerForJob(job.getKey(), jobProps);
this.scheduler.getScheduler().scheduleJob(job, trigger);
- LOG.info(String.format("Scheduled job %s. Next run: %s.", job.getKey(),
trigger.getNextFireTime()));
+ logNewlyScheduledJob(job, trigger);
} catch (SchedulerException se) {
LOG.error("Failed to schedule job " + jobName, se);
throw new JobException("Failed to schedule job " + jobName, se);
@@ -407,6 +407,10 @@ public class JobScheduler extends AbstractIdleService {
this.scheduledJobs.put(jobName, job.getKey());
}
+ protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) {
+ LOG.info(String.format("Scheduled job %s. Next run: %s.", job.getKey(),
trigger.getNextFireTime()));
+ }
+
/**
* Unschedule and delete a job.
*
@@ -606,13 +610,6 @@ public class JobScheduler extends AbstractIdleService {
JobScheduler jobScheduler = (JobScheduler)
dataMap.get(JOB_SCHEDULER_KEY);
Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
- // Obtain trigger timestamp from trigger to pass to jobProps
- Trigger trigger = context.getTrigger();
- // THIS current event has already fired if this method is called, so it
now exists in <previousFireTime>
- long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
-
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
- String.valueOf(triggerTimestampMillis));
-
try {
jobScheduler.runJob(jobProps, jobListener);
} catch (Throwable t) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index b62a869ba..5e0ac3988 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -30,14 +30,17 @@ import java.util.Properties;
import java.util.TimeZone;
import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.helix.HelixManager;
import org.quartz.CronExpression;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
+import org.quartz.Trigger;
import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +99,7 @@ import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFI
*/
@Alpha
@Singleton
+@Slf4j
public class GobblinServiceJobScheduler extends JobScheduler implements
SpecCatalogListener {
// Scheduler related configuration
@@ -442,6 +446,19 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
}
+ @Override
+ protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) {
+ Properties jobProps = (Properties) job.getJobDataMap().get(PROPERTIES_KEY);
+ log.info(jobSchedulerTracePrefixBuilder(jobProps) + "nextTriggerTime: {} -
Job newly scheduled",
+ trigger.getNextFireTime());
+ }
+
+ protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) {
+ return String.format("Scheduler trigger tracing: [flowName: %s flowGroup:
%s] - ",
+ jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY, "<<no flow
name>>"),
+ jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "<<no flow
group>>"));
+ }
+
@Override
public void runJob(Properties jobProps, JobListener jobListener) throws
JobException {
try {
@@ -576,6 +593,13 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
this.scheduledFlowSpecs.remove(specURI.toString());
this.lastUpdatedTimeForFlowSpec.remove(specURI.toString());
unscheduleJob(specURI.toString());
+ try {
+ FlowSpec spec = (FlowSpec) this.flowCatalog.get().getSpecs(specURI);
+ Properties properties = spec.getConfigAsProperties();
+ _log.info(jobSchedulerTracePrefixBuilder(properties) + "Unscheduled
Spec");
+ } catch (SpecNotFoundException e) {
+ _log.warn("Unable to retrieve spec for URI {}", specURI);
+ }
} else {
throw new JobException(String.format(
"Spec with URI: %s was not found in cache. May be it was cleaned, if
not please clean it manually",
@@ -666,13 +690,22 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
@Override
public void executeImpl(JobExecutionContext context) throws
JobExecutionException {
- _log.info("Starting FlowSpec " + context.getJobDetail().getKey());
+ JobDetail jobDetail = context.getJobDetail();
+ _log.info("Starting FlowSpec " + jobDetail.getKey());
- JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+ JobDataMap dataMap = jobDetail.getJobDataMap();
JobScheduler jobScheduler = (JobScheduler)
dataMap.get(JOB_SCHEDULER_KEY);
Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
+ // Obtain trigger timestamp from trigger to pass to jobProps
+ Trigger trigger = context.getTrigger();
+ // THIS current event has already fired if this method is called, so it
now exists in <previousFireTime>
+ long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
+
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
+ String.valueOf(triggerTimestampMillis));
+ _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {}
nextTriggerTime: {} - Job triggered by "
+ + "scheduler", triggerTimestampMillis,
trigger.getNextFireTime().getTime());
try {
jobScheduler.runJob(jobProps, jobListener);
} catch (Throwable t) {