This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8ae0a6c0e [GOBBLIN-1885] Ensure All Logged Timestamps are in UTC
(#3747)
8ae0a6c0e is described below
commit 8ae0a6c0e10dfe1f12540aadc52ba84c8b255e1f
Author: umustafi <[email protected]>
AuthorDate: Wed Aug 23 11:15:08 2023 -0700
[GOBBLIN-1885] Ensure All Logged Timestamps are in UTC (#3747)
* ensure all logged timestamps are in utc
* use java.time class
* fix logging
* remove .* import
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../modules/orchestration/FlowTriggerHandler.java | 4 +-
.../scheduler/GobblinServiceJobScheduler.java | 70 ++++++++++++----------
2 files changed, 42 insertions(+), 32 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index cb40b8ad9..953e44c86 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -112,7 +112,7 @@ public class FlowTriggerHandler {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
this.leaseObtainedCount.inc();
if (persistFlowAction(leaseObtainedStatus)) {
- log.info("Successfully persisted lease: [%s, eventTimestamp: %s] ",
leaseObtainedStatus.getFlowAction(),
+ log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ",
leaseObtainedStatus.getFlowAction(),
leaseObtainedStatus.getEventTimestamp());
return;
}
@@ -129,7 +129,7 @@ public class FlowTriggerHandler {
return;
} else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.NoLongerLeasingStatus) {
this.noLongerLeasingStatusCount.inc();
- log.debug("Received type of leaseAttemptStatus: [%s, eventTimestamp:
%s] ", leaseAttemptStatus.getClass().getName(),
+ log.debug("Received type of leaseAttemptStatus: [{}, eventTimestamp:
{}] ", leaseAttemptStatus.getClass().getName(),
eventTimeMillis);
return;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index a9db2c9bd..7df02f3d5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -17,9 +17,19 @@
package org.apache.gobblin.service.modules.scheduler;
+import com.codahale.metrics.MetricFilter;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
@@ -28,37 +38,12 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
-import org.apache.helix.HelixManager;
-import org.quartz.CronExpression;
-import org.quartz.DisallowConcurrentExecution;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.SchedulerException;
-import org.quartz.Trigger;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.codahale.metrics.MetricFilter;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
@@ -70,6 +55,7 @@ import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
@@ -82,13 +68,27 @@ import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManager;
-import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.FlowTriggerHandler;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.helix.HelixManager;
+import org.quartz.CronExpression;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobDetail;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
@@ -458,7 +458,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) {
Properties jobProps = (Properties) job.getJobDataMap().get(PROPERTIES_KEY);
log.info(jobSchedulerTracePrefixBuilder(jobProps) + "nextTriggerTime: {} -
Job newly scheduled",
- trigger.getNextFireTime().getTime());
+ asUTCEpochMillis(trigger.getNextFireTime()));
}
protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) {
@@ -467,6 +467,16 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "<<no flow
group>>"));
}
+ /**
+ * Takes a given Date object and converts the timezone to UTC before
returning the number of millseconds since epoch
+ * @param date
+ */
+ public static long asUTCEpochMillis(Date date) {
+ return ZonedDateTime.of(
+ LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault()),
+ ZoneOffset.UTC).toInstant().toEpochMilli();
+ }
+
@Override
public void runJob(Properties jobProps, JobListener jobListener) throws
JobException {
try {
@@ -716,11 +726,11 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
// Obtain trigger timestamp from trigger to pass to jobProps
Trigger trigger = context.getTrigger();
// THIS current event has already fired if this method is called, so it
now exists in <previousFireTime>
- long triggerTimestampMillis = trigger.getPreviousFireTime().getTime();
+ long triggerTimestampMillis =
asUTCEpochMillis(trigger.getPreviousFireTime());
jobProps.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
String.valueOf(triggerTimestampMillis));
_log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {}
nextTriggerTime: {} - Job triggered by "
- + "scheduler", triggerTimestampMillis,
trigger.getNextFireTime().getTime());
+ + "scheduler", triggerTimestampMillis,
asUTCEpochMillis(trigger.getNextFireTime()));
try {
jobScheduler.runJob(jobProps, jobListener);
} catch (Throwable t) {