This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7a75118f3 [GOBBLIN-1902] Log intermittent NPE from ExecuteImpl (#3767)
7a75118f3 is described below
commit 7a75118f3e56fbe925e62d45bf56f5c4b959bd56
Author: umustafi <[email protected]>
AuthorDate: Tue Sep 12 15:11:35 2023 -0700
[GOBBLIN-1902] Log intermittent NPE from ExecuteImpl (#3767)
* Log NPE in executeImpl
* Include stack trace in logging
* change stacktrace print
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../scheduler/GobblinServiceJobScheduler.java | 62 ++++++++++++----------
1 file changed, 33 insertions(+), 29 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 3b983af47..ad90d7188 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -728,38 +728,42 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
@Override
public void executeImpl(JobExecutionContext context) throws
JobExecutionException {
- JobDetail jobDetail = context.getJobDetail();
- _log.info("Starting FlowSpec " + jobDetail.getKey());
-
- JobDataMap dataMap = jobDetail.getJobDataMap();
- JobScheduler jobScheduler = (JobScheduler)
dataMap.get(JOB_SCHEDULER_KEY);
- Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
- JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
-
- // Obtain trigger timestamp from trigger to pass to jobProps
- Trigger trigger = context.getTrigger();
- // THIS current event has already fired if this method is called, so it
now exists in <previousFireTime>
- long triggerTimeMillis = asUTCEpochMillis(trigger.getPreviousFireTime());
- // If the trigger is a reminder type event then utilize the trigger time
saved in job properties rather than the
- // actual firing time
- if (jobDetail.getKey().getName().contains("reminder")) {
- String preservedConsensusEventTime = jobProps.getProperty(
-
ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY, "0");
- String expectedReminderTime = jobProps.getProperty(
- ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
"0");
- _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {}
expectedReminderTime: {} - Reminder job "
- + "triggered by scheduler at {}", preservedConsensusEventTime,
expectedReminderTime, triggerTimeMillis);
- // TODO: add a metric if expected reminder time far exceeds system time
-
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
preservedConsensusEventTime);
- } else {
-
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
- String.valueOf(triggerTimeMillis));
- _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {}
nextTriggerTime: {} - Job triggered by "
- + "scheduler", triggerTimeMillis,
asUTCEpochMillis(trigger.getNextFireTime()));
- }
try {
+ // TODO: move this out of the try clause after location NPE source
+ JobDetail jobDetail = context.getJobDetail();
+ _log.info("Starting FlowSpec " + jobDetail.getKey());
+
+ JobDataMap dataMap = jobDetail.getJobDataMap();
+ JobScheduler jobScheduler = (JobScheduler)
dataMap.get(JOB_SCHEDULER_KEY);
+ Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
+ JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
+
+ // Obtain trigger timestamp from trigger to pass to jobProps
+ Trigger trigger = context.getTrigger();
+ // THIS current event has already fired if this method is called, so
it now exists in <previousFireTime>
+ long triggerTimeMillis =
asUTCEpochMillis(trigger.getPreviousFireTime());
+ // If the trigger is a reminder type event then utilize the trigger
time saved in job properties rather than the
+ // actual firing time
+ if (jobDetail.getKey().getName().contains("reminder")) {
+ String preservedConsensusEventTime = jobProps.getProperty(
+
ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY, "0");
+ String expectedReminderTime = jobProps.getProperty(
+ ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY,
"0");
+ _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime:
{} expectedReminderTime: {} - Reminder job "
+ + "triggered by scheduler at {}", preservedConsensusEventTime,
expectedReminderTime, triggerTimeMillis);
+ // TODO: add a metric if expected reminder time far exceeds system
time
+
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
preservedConsensusEventTime);
+ } else {
+
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
+ String.valueOf(triggerTimeMillis));
+ _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime:
{} nextTriggerTime: {} - Job triggered by "
+ + "scheduler", triggerTimeMillis,
asUTCEpochMillis(trigger.getNextFireTime()));
+ }
jobScheduler.runJob(jobProps, jobListener);
} catch (Throwable t) {
+ if (t instanceof NullPointerException) {
+ log.warn("NullPointerException encountered while trying to execute
flow. Message: " + t.getMessage(), t);
+ }
throw new JobExecutionException(t);
} finally {
scheduledFlows.mark();