This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 24e63e8b1 [GOBBLIN-1875] Create Unique Trigger Keys  (#3737)
24e63e8b1 is described below

commit 24e63e8b11990ab850f5b4446ae5db2de8514b78
Author: umustafi <[email protected]>
AuthorDate: Tue Aug 15 09:57:42 2023 -0700

    [GOBBLIN-1875] Create Unique Trigger Keys  (#3737)
    
    * create unique trigger keys for each job
    
    * add comment
    
    * use optional suffix instead of random int to differentiate
    
    * cleanup code in response to comments
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../runtime/api/MysqlMultiActiveLeaseArbiter.java  |  2 +-
 .../org/apache/gobblin/scheduler/JobScheduler.java | 11 ++---
 .../apache/gobblin/scheduler/JobSchedulerTest.java | 48 ++++++++++++++++++++++
 .../modules/orchestration/FlowTriggerHandler.java  | 17 +++++++-
 4 files changed, 70 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index 39fb84844..33ab56189 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -273,7 +273,7 @@ public class MysqlMultiActiveLeaseArbiter implements 
MultiActiveLeaseArbiter {
           ResultSet resultSet = getInfoStatement.executeQuery();
           try {
             if (!resultSet.next()) {
-              return Optional.absent();
+              return Optional.<GetEventInfoResult>absent();
             }
             return Optional.of(createGetInfoResult(resultSet));
           } finally {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
index c49de3c6a..a9761bff3 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java
@@ -52,7 +52,6 @@ import org.quartz.SchedulerException;
 import org.quartz.Trigger;
 import org.quartz.TriggerBuilder;
 import org.quartz.UnableToInterruptJobException;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -396,7 +395,7 @@ public class JobScheduler extends AbstractIdleService {
 
     try {
       // Schedule the Quartz job with a trigger built from the job 
configuration
-      Trigger trigger = createTriggerForJob(job.getKey(), jobProps);
+      Trigger trigger = createTriggerForJob(job.getKey(), jobProps, 
Optional.absent());
       this.scheduler.getScheduler().scheduleJob(job, trigger);
       logNewlyScheduledJob(job, trigger);
     } catch (SchedulerException se) {
@@ -583,12 +582,14 @@ public class JobScheduler extends AbstractIdleService {
   }
 
   /**
-   * Get a {@link org.quartz.Trigger} from the given job configuration 
properties.
+   * Get a {@link org.quartz.Trigger} from the given job configuration 
properties. If triggerSuffix is provided, appends
+   * it to the end of the flow name. The suffix is used to add multiple unique 
triggers associated with the same job
    */
-  public static Trigger createTriggerForJob(JobKey jobKey, Properties 
jobProps) {
+  public static Trigger createTriggerForJob(JobKey jobKey, Properties 
jobProps, Optional<String> triggerSuffix) {
     // Build a trigger for the job with the given cron-style schedule
     return TriggerBuilder.newTrigger()
-        .withIdentity(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY),
+        .withIdentity(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)
+            + triggerSuffix.transform(s -> "_" + s).or(""),
             
Strings.nullToEmpty(jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY)))
         .forJob(jobKey)
         
.withSchedule(CronScheduleBuilder.cronSchedule(jobProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY)))
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
new file mode 100644
index 000000000..b1f8c8cfd
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/scheduler/JobSchedulerTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.scheduler;
+
+import com.google.common.base.Optional;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.junit.Assert;
+import org.quartz.JobKey;
+import org.quartz.Trigger;
+import org.testng.annotations.Test;
+
+
+public class JobSchedulerTest {
+  // This test creates two triggers with the same job key and job props, but 
one should have an extra value appended to
+  // it.
+  @Test
+  public void testCreateUniqueTriggersForJob() {
+    String jobName = "flow123";
+    String jobGroup = "groupA";
+    JobKey jobKey = new JobKey(jobName, jobGroup);
+    Properties jobProps = new Properties();
+    jobProps.put(ConfigurationKeys.JOB_NAME_KEY, jobName);
+    jobProps.put(ConfigurationKeys.JOB_GROUP_KEY, jobGroup);
+    jobProps.put(ConfigurationKeys.JOB_SCHEDULE_KEY, "0/2 * * * * ?");
+
+    Trigger trigger1 = JobScheduler.createTriggerForJob(jobKey, jobProps, 
Optional.absent());
+    Trigger trigger2 = JobScheduler.createTriggerForJob(jobKey, jobProps, 
Optional.of("suffix"));
+
+    Assert.assertTrue(trigger1.getKey() != trigger2.getKey());
+    Assert.assertTrue(trigger2.getKey().getName().endsWith("suffix"));
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 90379e730..f9bd9c983 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -179,18 +179,31 @@ public class FlowTriggerHandler {
         String.valueOf(originalEventTimeMillis));
     JobKey key = new JobKey(flowAction.getFlowName(), 
flowAction.getFlowGroup());
     // Create a new trigger for the flow in job scheduler that is set to fire 
at the minimum reminder wait time calculated
-    Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps);
+    Trigger trigger = JobScheduler.createTriggerForJob(key, jobProps,
+        Optional.of(createSuffixForJobTrigger(status.getEventTimeMillis())));
     try {
       log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] -  attempting 
to schedule reminder for event {} in {} millis",
           flowAction, originalEventTimeMillis, status.getEventTimeMillis(), 
trigger.getNextFireTime());
       this.schedulerService.getScheduler().scheduleJob(trigger);
     } catch (SchedulerException e) {
-      log.warn("Failed to add job reminder due to SchedulerException for job 
{} trigger event {} ", key, status.getEventTimeMillis(), e);
+      log.warn("Failed to add job reminder due to SchedulerException for job 
{} trigger event {} ", key,
+          status.getEventTimeMillis(), e);
+      // TODO: emit a metric for failed job reminders
     }
     log.info(String.format("Flow Trigger Handler - [%s, eventTimestamp: %s] - 
SCHEDULED REMINDER for event %s in %s millis",
         flowAction, originalEventTimeMillis, status.getEventTimeMillis(), 
trigger.getNextFireTime()));
   }
 
+  /**
+   * Create suffix to add to end of flow name to differentiate reminder 
triggers from the original job schedule trigger
+   * and ensure they are added to the scheduler.
+   * @param eventToRevisitMillis
+   * @return
+   */
+  public static String createSuffixForJobTrigger(long eventToRevisitMillis) {
+    return "reminder_for_" + eventToRevisitMillis;
+  }
+
   /**
    * These methods should only be called from the Orchestrator or JobScheduler 
classes as it directly adds jobs to the
    * Quartz scheduler

Reply via email to