This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 24e63e8b1 [GOBBLIN-1875] Create Unique Trigger Keys (#3737)
24e63e8b1 is described below
commit 24e63e8b11990ab850f5b4446ae5db2de8514b78
Author: umustafi <[email protected]>
AuthorDate: Tue Aug 15 09:57:42 2023 -0700
[GOBBLIN-1875] Create Unique Trigger Keys (#3737)
* create unique trigger keys for each job
* add comment
* use optional suffix instead of random int to differentiate
* cleanup code in response to comments
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../runtime/api/MysqlMultiActiveLeaseArbiter.java | 2 +-
.../org/apache/gobblin/scheduler/JobScheduler.java | 11 ++---
.../apache/gobblin/scheduler/JobSchedulerTest.java | 48 ++++++++++++++++++++++
.../modules/orchestration/FlowTriggerHandler.java | 17 +++++++-
4 files changed, 70 insertions(+), 8 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 39fb84844..33ab56189 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -273,7 +273,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
ResultSet resultSet = getInfoStatement.executeQuery();
try {
if (!resultSet.next()) {
- return Optional.absent();
+ return Optional.<GetEventInfoResult>absent();
}
return Optional.of(createGetInfoResult(resultSet));
} finally {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index c49de3c6a..a9761bff3 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -52,7 +52,6 @@ import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.UnableToInterruptJobException;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -396,7 +395,7 @@ public class JobScheduler extends AbstractIdleService {
try {
// Schedule the Quartz job with a trigger built from the job
configuration
- Trigger trigger = createTriggerForJob(job.getKey(), jobProps);
+ Trigger trigger = createTriggerForJob(job.getKey(), jobProps,
Optional.absent());
this.scheduler.getScheduler().scheduleJob(job, trigger);
logNewlyScheduledJob(job, trigger);
} catch (SchedulerException se) {
@@ -583,12 +582,14 @@ public class JobScheduler extends AbstractIdleService {
}
/**
- * Get a {@link org.quartz.Trigger} from the given job configuration
properties.
+ * Get a {@link org.quartz.Trigger} from the given job configuration
properties. If triggerSuffix is provided, appends
+ * it to the end of the flow name. The suffix is used to add multiple unique
triggers associated with the same job
*/
- public static Trigger createTriggerForJob(JobKey jobKey, Properties
jobProps) {
+ public static Trigger createTriggerForJob(JobKey jobKey, Properties
jobProps, Optional<String> triggerSuffix) {
// Build a trigger for the job with the given cron-style schedule
return TriggerBuilder.newTrigger()
- .withIdentity(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY),
+ .withIdentity(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)
+ + triggerSuffix.transform(s -> "_" + s).or(""),
Strings.nullToEmpty(jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY)))
.forJob(jobKey)
.withSchedule(CronScheduleBuilder.cronSchedule(jobProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY)))
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
new file mode 100644
index 000000000..b1f8c8cfd
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.scheduler;
+
+import com.google.common.base.Optional;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.junit.Assert;
+import org.quartz.JobKey;
+import org.quartz.Trigger;
+import org.testng.annotations.Test;
+
+
+public class JobSchedulerTest {
+ // This test creates two triggers with the same job key and job props, but
one should have an extra value appended to
+ // it.
+ @Test
+ public void testCreateUniqueTriggersForJob() {
+ String jobName = "flow123";
+ String jobGroup = "groupA";
+ JobKey jobKey = new JobKey(jobName, jobGroup);
+ Properties jobProps = new Properties();
+ jobProps.put(ConfigurationKeys.JOB_NAME_KEY, jobName);
+ jobProps.put(ConfigurationKeys.JOB_GROUP_KEY, jobGroup);
+ jobProps.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?");
+
+ Trigger trigger1 = JobScheduler.createTriggerForJob(jobKey, jobProps,
Optional.absent());
+ Trigger trigger2 = JobScheduler.createTriggerForJob(jobKey, jobProps,
Optional.of("suffix"));
+
+ Assert.assertTrue(trigger1.getKey() != trigger2.getKey());
+ Assert.assertTrue(trigger2.getKey().getName().endsWith("suffix"));
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 90379e730..f9bd9c983 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -179,18 +179,31 @@ public class FlowTriggerHandler {
String.valueOf(originalEventTimeMillis));
JobKey key = new JobKey(flowAction.getFlowName(),
flowAction.getFlowGroup());
// Create a new trigger for the flow in job scheduler that is set to fire
at the minimum reminder wait time calculated
- Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps);
+ Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps,
+ Optional.of(createSuffixForJobTrigger(status.getEventTimeMillis())));
try {
log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] - attempting
to schedule reminder for event {} in {} millis",
flowAction, originalEventTimeMillis, status.getEventTimeMillis(),
trigger.getNextFireTime());
this.schedulerService.getScheduler().scheduleJob(trigger);
} catch (SchedulerException e) {
- log.warn("Failed to add job reminder due to SchedulerException for job
{} trigger event {} ", key, status.getEventTimeMillis(), e);
+ log.warn("Failed to add job reminder due to SchedulerException for job
{} trigger event {} ", key,
+ status.getEventTimeMillis(), e);
+ // TODO: emit a metric for failed job reminders
}
log.info(String.format("Flow Trigger Handler - [%s, eventTimestamp: %s] -
SCHEDULED REMINDER for event %s in %s millis",
flowAction, originalEventTimeMillis, status.getEventTimeMillis(),
trigger.getNextFireTime()));
}
+ /**
+ * Create suffix to add to end of flow name to differentiate reminder
triggers from the original job schedule trigger
+ * and ensure they are added to the scheduler.
+ * @param eventToRevisitMillis
+ * @return
+ */
+ public static String createSuffixForJobTrigger(long eventToRevisitMillis) {
+ return "reminder_for_" + eventToRevisitMillis;
+ }
+
/**
* These methods should only be called from the Orchestrator or JobScheduler
classes as it directly adds jobs to the
* Quartz scheduler