This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ae0a6c0e [GOBBLIN-1885] Ensure All Logged Timestamps are in UTC 
(#3747)
8ae0a6c0e is described below

commit 8ae0a6c0e10dfe1f12540aadc52ba84c8b255e1f
Author: umustafi <[email protected]>
AuthorDate: Wed Aug 23 11:15:08 2023 -0700

    [GOBBLIN-1885] Ensure All Logged Timestamps are in UTC (#3747)
    
    * ensure all logged timestamps are in utc
    
    * use java.time class
    
    * fix logging
    
    * remove .* import
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../modules/orchestration/FlowTriggerHandler.java  |  4 +-
 .../scheduler/GobblinServiceJobScheduler.java      | 70 ++++++++++++----------
 2 files changed, 42 insertions(+), 32 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index cb40b8ad9..953e44c86 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -112,7 +112,7 @@ public class FlowTriggerHandler {
         MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = 
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
         this.leaseObtainedCount.inc();
         if (persistFlowAction(leaseObtainedStatus)) {
-          log.info("Successfully persisted lease: [%s, eventTimestamp: %s] ", 
leaseObtainedStatus.getFlowAction(),
+          log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ", 
leaseObtainedStatus.getFlowAction(),
               leaseObtainedStatus.getEventTimestamp());
           return;
         }
@@ -129,7 +129,7 @@ public class FlowTriggerHandler {
         return;
       } else if (leaseAttemptStatus instanceof 
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
         this.noLongerLeasingStatusCount.inc();
-        log.debug("Received type of leaseAttemptStatus: [%s, eventTimestamp: 
%s] ", leaseAttemptStatus.getClass().getName(),
+        log.debug("Received type of leaseAttemptStatus: [{}, eventTimestamp: 
{}] ", leaseAttemptStatus.getClass().getName(),
             eventTimeMillis);
         return;
       }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index a9db2c9bd..7df02f3d5 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -17,9 +17,19 @@
 
 package org.apache.gobblin.service.modules.scheduler;
 
+import com.codahale.metrics.MetricFilter;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.text.ParseException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.Date;
@@ -28,37 +38,12 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TimeZone;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
-import org.apache.helix.HelixManager;
-import org.quartz.CronExpression;
-import org.quartz.DisallowConcurrentExecution;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.MetricFilter;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
 import javax.inject.Inject;
 import javax.inject.Named;
 import javax.inject.Singleton;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang.StringUtils;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
@@ -70,6 +55,7 @@ import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
@@ -82,13 +68,27 @@ import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
-import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.helix.HelixManager;
+import org.quartz.CronExpression;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
 
@@ -458,7 +458,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
   protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) {
     Properties jobProps = (Properties) job.getJobDataMap().get(PROPERTIES_KEY);
     log.info(jobSchedulerTracePrefixBuilder(jobProps) + "nextTriggerTime: {} - 
Job newly scheduled",
-         trigger.getNextFireTime().getTime());
+         asUTCEpochMillis(trigger.getNextFireTime()));
   }
 
   protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) {
@@ -467,6 +467,16 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
         jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "<<no flow 
group>>"));
   }
 
+  /**
+   * Takes a given Date object and converts the timezone to UTC before 
returning the number of millseconds since epoch
+   * @param date
+   */
+  public static long asUTCEpochMillis(Date date) {
+    return ZonedDateTime.of(
+        LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault()),
+        ZoneOffset.UTC).toInstant().toEpochMilli();
+  }
+
   @Override
   public void runJob(Properties jobProps, JobListener jobListener) throws 
JobException {
     try {
@@ -716,11 +726,11 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
       // Obtain trigger timestamp from trigger to pass to jobProps
       Trigger trigger = context.getTrigger();
       // THIS current event has already fired if this method is called, so it 
now exists in <previousFireTime>
-      long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
+      long triggerTimestampMillis = 
asUTCEpochMillis(trigger.getPreviousFireTime());
       
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
           String.valueOf(triggerTimestampMillis));
       _log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} 
nextTriggerTime: {} - Job triggered by "
-              + "scheduler", triggerTimestampMillis, 
trigger.getNextFireTime().getTime());
+              + "scheduler", triggerTimestampMillis, 
asUTCEpochMillis(trigger.getNextFireTime()));
       try {
         jobScheduler.runJob(jobProps, jobListener);
       } catch (Throwable t) {

Reply via email to