This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a75118f3 [GOBBLIN-1902] Log intermittent NPE from ExecuteImpl (#3767)
7a75118f3 is described below

commit 7a75118f3e56fbe925e62d45bf56f5c4b959bd56
Author: umustafi <[email protected]>
AuthorDate: Tue Sep 12 15:11:35 2023 -0700

    [GOBBLIN-1902] Log intermittent NPE from ExecuteImpl (#3767)
    
    * Log NPE in executeImpl
    
    * Include stack trace in logging
    
    * change stacktrace print
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../scheduler/GobblinServiceJobScheduler.java      | 62 ++++++++++++----------
 1 file changed, 33 insertions(+), 29 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 3b983af47..ad90d7188 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -728,38 +728,42 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
 
     @Override
     public void executeImpl(JobExecutionContext context) throws 
JobExecutionException {
-      JobDetail jobDetail = context.getJobDetail();
-      _log.info("Starting FlowSpec " + jobDetail.getKey());
-
-      JobDataMap dataMap = jobDetail.getJobDataMap();
-      JobScheduler jobScheduler = (JobScheduler) 
dataMap.get(JOB_SCHEDULER_KEY);
-      Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
-      JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
-
-      // Obtain trigger timestamp from trigger to pass to jobProps
-      Trigger trigger = context.getTrigger();
-      // THIS current event has already fired if this method is called, so it 
now exists in <previousFireTime>
-      long triggerTimeMillis = asUTCEpochMillis(trigger.getPreviousFireTime());
-      // If the trigger is a reminder type event then utilize the trigger time 
saved in job properties rather than the
-      // actual firing time
-      if (jobDetail.getKey().getName().contains("reminder")) {
-        String preservedConsensusEventTime = jobProps.getProperty(
-            
ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY, "0");
-        String expectedReminderTime = jobProps.getProperty(
-            ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY, 
"0");
-        _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} 
expectedReminderTime: {} - Reminder job "
-            + "triggered by scheduler at {}", preservedConsensusEventTime, 
expectedReminderTime, triggerTimeMillis);
-        // TODO: add a metric if expected reminder time far exceeds system time
-        
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
 preservedConsensusEventTime);
-      } else {
-        
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
-            String.valueOf(triggerTimeMillis));
-        _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} 
nextTriggerTime: {} - Job triggered by "
-            + "scheduler", triggerTimeMillis, 
asUTCEpochMillis(trigger.getNextFireTime()));
-      }
       try {
+        // TODO: move this out of the try clause after location NPE source
+        JobDetail jobDetail = context.getJobDetail();
+        _log.info("Starting FlowSpec " + jobDetail.getKey());
+
+        JobDataMap dataMap = jobDetail.getJobDataMap();
+        JobScheduler jobScheduler = (JobScheduler) 
dataMap.get(JOB_SCHEDULER_KEY);
+        Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
+        JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
+
+        // Obtain trigger timestamp from trigger to pass to jobProps
+        Trigger trigger = context.getTrigger();
+        // THIS current event has already fired if this method is called, so 
it now exists in <previousFireTime>
+        long triggerTimeMillis = 
asUTCEpochMillis(trigger.getPreviousFireTime());
+        // If the trigger is a reminder type event then utilize the trigger 
time saved in job properties rather than the
+        // actual firing time
+        if (jobDetail.getKey().getName().contains("reminder")) {
+          String preservedConsensusEventTime = jobProps.getProperty(
+              
ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY, "0");
+          String expectedReminderTime = jobProps.getProperty(
+              ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY, 
"0");
+          _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: 
{} expectedReminderTime: {} - Reminder job "
+              + "triggered by scheduler at {}", preservedConsensusEventTime, 
expectedReminderTime, triggerTimeMillis);
+          // TODO: add a metric if expected reminder time far exceeds system 
time
+          
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
 preservedConsensusEventTime);
+        } else {
+          
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
+              String.valueOf(triggerTimeMillis));
+          _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: 
{} nextTriggerTime: {} - Job triggered by "
+              + "scheduler", triggerTimeMillis, 
asUTCEpochMillis(trigger.getNextFireTime()));
+        }
         jobScheduler.runJob(jobProps, jobListener);
       } catch (Throwable t) {
+        if (t instanceof NullPointerException) {
+          log.warn("NullPointerException encountered while trying to execute 
flow. Message: " + t.getMessage(), t);
+        }
         throw new JobExecutionException(t);
       } finally {
         scheduledFlows.mark();

Reply via email to