This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c578e5d28 Logs to improve multi-active debugging (#3735)
c578e5d28 is described below
commit c578e5d287c6167a89e24d2aea6fbe3b3a266aea
Author: umustafi <[email protected]>
AuthorDate: Fri Aug 11 08:36:14 2023 -0700
Logs to improve multi-active debugging (#3735)
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java | 2 ++
.../java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java | 2 +-
.../gobblin/service/modules/orchestration/FlowTriggerHandler.java | 2 --
.../service/modules/scheduler/GobblinServiceJobScheduler.java | 5 +++++
4 files changed, 8 insertions(+), 3 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 964a29851..39fb84844 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -190,6 +190,8 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
+ log.info("Multi-active scheduler about to handle trigger event: [{},
triggerEventTimestamp: {}]", flowAction,
+ eventTimeMillis);
// Query lease arbiter table about this flow action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 0e68ea9aa..5e8daaa26 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -139,7 +139,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
protected GobblinKafkaConsumerClient createConsumerClient(Config config) {
String kafkaConsumerClientClass =
config.getString(CONSUMER_CLIENT_FACTORY_CLASS_KEY);
- log.info("Creating consumer client of class {}", kafkaConsumerClientClass,
config);
+ log.info("Creating consumer client of class {} with config {}",
kafkaConsumerClientClass, config);
try {
Class clientFactoryClass = Class.forName(kafkaConsumerClientClass);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 410350b88..90379e730 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -104,8 +104,6 @@ public class FlowTriggerHandler {
public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction
flowAction, long eventTimeMillis)
throws IOException {
if (multiActiveLeaseArbiter.isPresent()) {
- log.info("Multi-active scheduler about to handle trigger event: [{},
triggerEventTimestamp: {}]", flowAction,
- eventTimeMillis);
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index f71093f52..89fde1694 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -298,6 +298,11 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
cron = new CronExpression(cronExpression);
cron.setTimeZone(TimeZone.getTimeZone("UTC"));
Date nextValidTimeAfter = cron.getNextValidTimeAfter(new Date());
+ if (nextValidTimeAfter == null) {
+ log.warn("Calculation issue for next valid time for expression: {}.
Will default to true for within range",
+ cronExpression);
+ return true;
+ }
cal.setTime(nextValidTimeAfter);
long diff = cal.getTimeInMillis() - System.currentTimeMillis();
return (int) Math.round(diff / numMillisInADay) <
maxNumDaysToScheduleWithin;