This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e2af88aa6 [GOBBLIN-1884] Delete Dag Action After Loading from Store 
Upon Startup (#3746)
e2af88aa6 is described below

commit e2af88aa625cf693fa29dcb2e8a373e6e3a1479c
Author: umustafi <[email protected]>
AuthorDate: Tue Aug 22 14:48:35 2023 -0700

    [GOBBLIN-1884] Delete Dag Action After Loading from Store Upon Startup 
(#3746)
    
    * delete dag action after loading from store upon startup
    
    * Address review comments from PR#3743 and this one
    
    * Add unit tests
    
    * Ensure reminder event uses new JobKey
    
    ---------
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 FlowTriggerHandlerTest.java                        | 74 ++++++++++++++++++++++
 .../service/modules/orchestration/DagManager.java  | 16 +++--
 .../modules/orchestration/FlowTriggerHandler.java  | 53 ++++++++++------
 3 files changed, 116 insertions(+), 27 deletions(-)

diff --git a/FlowTriggerHandlerTest.java b/FlowTriggerHandlerTest.java
new file mode 100644
index 000000000..a4424bd7d
--- /dev/null
+++ b/FlowTriggerHandlerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+import org.junit.Assert;
+import org.quartz.JobDataMap;
+import org.testng.annotations.Test;
+
+
+public class FlowTriggerHandlerTest {
+  String newCronExpression = "0 0 0 ? * * 2024";
+  long newEventToRevisit = 123L;
+  long newEventToTrigger = 456L;
+
+  /**
+   * Provides an input with all three values (cronExpression, 
reminderTimestamp, originalEventTime) set in the map
+   * Properties and checks that they are updated properly
+   */
+  @Test
+  public void testUpdatePropsInJobDataMap() {
+    JobDataMap oldJobDataMap = new JobDataMap();
+    Properties originalProperties = new Properties();
+    originalProperties.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 0 0 
? * * 2050");
+    
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
 "0");
+    
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
 "1");
+    oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, 
originalProperties);
+
+    JobDataMap newJobDataMap = 
FlowTriggerHandler.updatePropsInJobDataMap(oldJobDataMap, newCronExpression,
+        newEventToRevisit, newEventToTrigger);
+    Properties newProperties = (Properties) 
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
+    Assert.assertEquals(newCronExpression, 
newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+    Assert.assertEquals(String.valueOf(newEventToRevisit),
+        
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY));
+    Assert.assertEquals(String.valueOf(newEventToTrigger),
+        
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY));
+  }
+
+  /**
+   * Provides input with an empty Properties object and checks that the three 
values in question are set.
+   */
+  @Test
+  public void testSetPropsInJobDataMap() {
+    JobDataMap oldJobDataMap = new JobDataMap();
+    Properties originalProperties = new Properties();
+    oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, 
originalProperties);
+    
+    JobDataMap newJobDataMap = 
FlowTriggerHandler.updatePropsInJobDataMap(oldJobDataMap, newCronExpression,
+        newEventToRevisit, newEventToTrigger);
+    Properties newProperties = (Properties) 
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
+    Assert.assertEquals(newCronExpression, 
newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+    Assert.assertEquals(String.valueOf(newEventToRevisit),
+        
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY));
+    Assert.assertEquals(String.valueOf(newEventToTrigger),
+        
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY));
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2de9068d0..43d3ab84e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -21,6 +21,7 @@ import com.codahale.metrics.Meter;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.Subscribe;
@@ -493,24 +494,27 @@ public class DagManager extends AbstractIdleService {
    * dagStore, we compile the flow to generate the dag before calling 
addDag(), handling any errors that may result in
    * the process.
    */
-  public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
-    FlowId flowId = action.getFlowId();
-    FlowSpec spec;
+  public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) {
+    Preconditions.checkArgument(launchAction.getFlowActionType() == 
DagActionStore.FlowActionType.LAUNCH);
+    log.info("Handle launch flow event for action {}", launchAction);
+    FlowId flowId = launchAction.getFlowId();
     try {
-      log.info("Handle launch flow event for action: {}", action);
       URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
-      spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
       Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
           
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
       if (optionalJobExecutionPlanDag.isPresent()) {
         addDag(optionalJobExecutionPlanDag.get(), true, true);
       }
+      // Upon handling the action, delete it so on leadership change this is 
not duplicated
+      this.dagActionStore.get().deleteDagAction(launchAction);
     } catch (URISyntaxException e) {
       log.warn("Could not create URI object for flowId {} due to exception 
{}", flowId, e.getMessage());
     } catch (SpecNotFoundException e) {
       log.warn("Spec not found for flowId {} due to exception {}", flowId, 
e.getMessage());
     } catch (IOException e) {
-      log.warn("Failed to add Job Execution Plan for flowId {} due to 
exception {}", flowId, e.getMessage());
+      log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag 
action from dagActionStore due to "
+          + "exception {}", flowId, e.getMessage());
     } catch (InterruptedException e) {
       log.warn("SpecCompiler failed to reach healthy state before compilation 
of flowId {}. Exception: ", flowId, e);
     }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 669b8eccb..cb40b8ad9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -44,7 +44,6 @@ import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.util.ConfigUtils;
 import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
 import org.quartz.JobKey;
 import org.quartz.SchedulerException;
 import org.quartz.Trigger;
@@ -176,29 +175,24 @@ public class FlowTriggerHandler {
         + random.nextInt(schedulerMaxBackoffMillis));
     JobKey origJobKey = new 
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
         jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job 
group>>"));
-    // Triggers:job have an N:1 relationship but the job properties must 
remain constant between both, which does not
-    // allow us to keep track of additional properties for reminder events. By 
reusing the same job key, we either
-    // encounter an exception that the job already exists and cannot add it to 
the scheduler or have to overwrite the
-    // original job properties with the reminder event schedule. Thus, we 
differentiate the job and trigger key from the
-    // original event.
-    JobKey newJobKey = new JobKey(origJobKey.getName() + 
createSuffixForJobTrigger(status.getEventTimeMillis()),
-        origJobKey.getGroup());
     try {
       if (!this.schedulerService.getScheduler().checkExists(origJobKey)) {
-        log.warn("Attempting to set a reminder for a job that does not exist 
in the scheduler. Key: {}", origJobKey);
+        log.warn("Skipping setting a reminder for a job that does not exist in 
the scheduler. Key: {}", origJobKey);
         this.jobDoesNotExistInSchedulerCount.inc();
         return;
       }
-      JobDetailImpl jobDetail = (JobDetailImpl) 
updatePropsInJobDetail(origJobKey, cronExpression,
+      JobKey reminderJobKey = constructReminderJobKey(origJobKey, 
status.getEventTimeMillis());
+      JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey, 
reminderJobKey, cronExpression,
           status.getEventTimeMillis(), originalEventTimeMillis);
       // Create a new trigger that is set to fire at the minimum reminder wait 
time calculated
-      Trigger trigger = JobScheduler.createTriggerForJob(newJobKey,
+      Trigger reminderTrigger = 
JobScheduler.createTriggerForJob(reminderJobKey,
           (Properties) 
jobDetail.getJobDataMap().get(GobblinServiceJobScheduler.PROPERTIES_KEY), 
Optional.absent());
-      log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] -  attempting 
to schedule reminder for event {} in {} "
-              + "millis", flowAction, originalEventTimeMillis, 
status.getEventTimeMillis(), trigger.getNextFireTime());
-      this.schedulerService.getScheduler().scheduleJob(jobDetail, trigger);
+      // TODO: remove this comment once we've confirmed this function works
+      log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] -  attempting 
to schedule reminder for event {}",
+          flowAction, originalEventTimeMillis, status.getEventTimeMillis());
+      this.schedulerService.getScheduler().scheduleJob(jobDetail, 
reminderTrigger);
       log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] - SCHEDULED 
REMINDER for event {} in {} millis",
-          flowAction, originalEventTimeMillis, status.getEventTimeMillis(), 
trigger.getNextFireTime());
+          flowAction, originalEventTimeMillis, status.getEventTimeMillis(), 
reminderTrigger.getNextFireTime());
     } catch (SchedulerException e) {
       log.warn("Failed to add job reminder due to SchedulerException for job 
{} trigger event {}. Exception: {}",
           origJobKey, status.getEventTimeMillis(), e);
@@ -207,18 +201,35 @@ public class FlowTriggerHandler {
   }
 
   /**
-   * Helper function used to extract JobDetail for job identified by the key 
and update the Properties map to contain
-   * the cron scheduler for the reminder event and information about the event 
to revisit
-   * @param key
+   * This function constructs the JobKey for a reminder event given the 
original JobKey. Although multiple triggers can
+   * be created for one job, they are required to maintain the same jobProps 
for all triggers. This does not allow us
+   * to keep track of additional properties needed for reminder events, so we 
create a unique job and must differentiate
+   * the keys from the main job.
+   * @param originalJobKey
+   * @param eventToRevisitMillis
+   * @return
+   */
+  protected JobKey constructReminderJobKey(JobKey originalJobKey, long 
eventToRevisitMillis) {
+    return new JobKey(originalJobKey.getName() + 
createSuffixForJobTrigger(eventToRevisitMillis),
+        originalJobKey.getGroup());
+  }
+
+  /**
+   * Helper function used to extract JobDetail for job identified by the 
originalKey and update it be associated with
+   * the event to revisit. It will update the jobKey to the reminderKey 
provides and the Properties map to
+   * contain the cron scheduler for the reminder event and information about 
the event to revisit
+   * @param originalKey
+   * @param reminderKey
    * @param cronExpression
    * @param reminderTimestampMillis
    * @param originalEventTimeMillis
    * @return
    * @throws SchedulerException
    */
-  protected JobDetail updatePropsInJobDetail(JobKey key, String 
cronExpression, long reminderTimestampMillis,
-      long originalEventTimeMillis) throws SchedulerException {
-    JobDetailImpl jobDetail = (JobDetailImpl) 
this.schedulerService.getScheduler().getJobDetail(key);
+  protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey, 
JobKey reminderKey,
+      String cronExpression, long reminderTimestampMillis, long 
originalEventTimeMillis) throws SchedulerException {
+    JobDetailImpl jobDetail = (JobDetailImpl) 
this.schedulerService.getScheduler().getJobDetail(originalKey);
+    jobDetail.setKey(reminderKey);
     JobDataMap jobDataMap = jobDetail.getJobDataMap();
     jobDataMap = updatePropsInJobDataMap(jobDataMap, cronExpression, 
reminderTimestampMillis, originalEventTimeMillis);
     jobDetail.setJobDataMap(jobDataMap);

Reply via email to