This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e2af88aa6 [GOBBLIN-1884] Delete Dag Action After Loading from Store
Upon Startup (#3746)
e2af88aa6 is described below
commit e2af88aa625cf693fa29dcb2e8a373e6e3a1479c
Author: umustafi <[email protected]>
AuthorDate: Tue Aug 22 14:48:35 2023 -0700
[GOBBLIN-1884] Delete Dag Action After Loading from Store Upon Startup
(#3746)
* delete dag action after loading from store upon startup
* Address review comments from PR#3743 and this one
* Add unit tests
* Ensure reminder event uses new JobKey
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
FlowTriggerHandlerTest.java | 74 ++++++++++++++++++++++
.../service/modules/orchestration/DagManager.java | 16 +++--
.../modules/orchestration/FlowTriggerHandler.java | 53 ++++++++++------
3 files changed, 116 insertions(+), 27 deletions(-)
diff --git a/FlowTriggerHandlerTest.java b/FlowTriggerHandlerTest.java
new file mode 100644
index 000000000..a4424bd7d
--- /dev/null
+++ b/FlowTriggerHandlerTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+import org.junit.Assert;
+import org.quartz.JobDataMap;
+import org.testng.annotations.Test;
+
+
+public class FlowTriggerHandlerTest {
+ String newCronExpression = "0 0 0 ? * * 2024";
+ long newEventToRevisit = 123L;
+ long newEventToTrigger = 456L;
+
+ /**
+ * Provides an input with all three values (cronExpression,
reminderTimestamp, originalEventTime) set in the map
+ * Properties and checks that they are updated properly
+ */
+ @Test
+ public void testUpdatePropsInJobDataMap() {
+ JobDataMap oldJobDataMap = new JobDataMap();
+ Properties originalProperties = new Properties();
+ originalProperties.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 0 0
? * * 2050");
+
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY,
"0");
+
originalProperties.setProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY,
"1");
+ oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY,
originalProperties);
+
+ JobDataMap newJobDataMap =
FlowTriggerHandler.updatePropsInJobDataMap(oldJobDataMap, newCronExpression,
+ newEventToRevisit, newEventToTrigger);
+ Properties newProperties = (Properties)
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
+ Assert.assertEquals(newCronExpression,
newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+ Assert.assertEquals(String.valueOf(newEventToRevisit),
+
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY));
+ Assert.assertEquals(String.valueOf(newEventToTrigger),
+
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY));
+ }
+
+ /**
+ * Provides input with an empty Properties object and checks that the three
values in question are set.
+ */
+ @Test
+ public void testSetPropsInJobDataMap() {
+ JobDataMap oldJobDataMap = new JobDataMap();
+ Properties originalProperties = new Properties();
+ oldJobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY,
originalProperties);
+
+ JobDataMap newJobDataMap =
FlowTriggerHandler.updatePropsInJobDataMap(oldJobDataMap, newCronExpression,
+ newEventToRevisit, newEventToTrigger);
+ Properties newProperties = (Properties)
newJobDataMap.get(GobblinServiceJobScheduler.PROPERTIES_KEY);
+ Assert.assertEquals(newCronExpression,
newProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+ Assert.assertEquals(String.valueOf(newEventToRevisit),
+
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_REVISIT_TIMESTAMP_MILLIS_KEY));
+ Assert.assertEquals(String.valueOf(newEventToTrigger),
+
newProperties.getProperty(ConfigurationKeys.SCHEDULER_EVENT_TO_TRIGGER_TIMESTAMP_MILLIS_KEY));
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2de9068d0..43d3ab84e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -21,6 +21,7 @@ import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
@@ -493,24 +494,27 @@ public class DagManager extends AbstractIdleService {
* dagStore, we compile the flow to generate the dag before calling
addDag(), handling any errors that may result in
* the process.
*/
- public void handleLaunchFlowEvent(DagActionStore.DagAction action) {
- FlowId flowId = action.getFlowId();
- FlowSpec spec;
+ public void handleLaunchFlowEvent(DagActionStore.DagAction launchAction) {
+ Preconditions.checkArgument(launchAction.getFlowActionType() ==
DagActionStore.FlowActionType.LAUNCH);
+ log.info("Handle launch flow event for action {}", launchAction);
+ FlowId flowId = launchAction.getFlowId();
try {
- log.info("Handle launch flow event for action: {}", action);
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
- spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+ FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
Optional<Dag<JobExecutionPlan>> optionalJobExecutionPlanDag =
this.flowCompilationValidationHelper.createExecutionPlanIfValid(spec);
if (optionalJobExecutionPlanDag.isPresent()) {
addDag(optionalJobExecutionPlanDag.get(), true, true);
}
+ // Upon handling the action, delete it so on leadership change this is
not duplicated
+ this.dagActionStore.get().deleteDagAction(launchAction);
} catch (URISyntaxException e) {
log.warn("Could not create URI object for flowId {} due to exception
{}", flowId, e.getMessage());
} catch (SpecNotFoundException e) {
log.warn("Spec not found for flowId {} due to exception {}", flowId,
e.getMessage());
} catch (IOException e) {
- log.warn("Failed to add Job Execution Plan for flowId {} due to
exception {}", flowId, e.getMessage());
+ log.warn("Failed to add Job Execution Plan for flowId {} OR delete dag
action from dagActionStore due to "
+ + "exception {}", flowId, e.getMessage());
} catch (InterruptedException e) {
log.warn("SpecCompiler failed to reach healthy state before compilation
of flowId {}. Exception: ", flowId, e);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 669b8eccb..cb40b8ad9 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -44,7 +44,6 @@ import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.util.ConfigUtils;
import org.quartz.JobDataMap;
-import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
@@ -176,29 +175,24 @@ public class FlowTriggerHandler {
+ random.nextInt(schedulerMaxBackoffMillis));
JobKey origJobKey = new
JobKey(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "<<no job name>>"),
jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "<<no job
group>>"));
- // Triggers:job have an N:1 relationship but the job properties must
remain constant between both, which does not
- // allow us to keep track of additional properties for reminder events. By
reusing the same job key, we either
- // encounter an exception that the job already exists and cannot add it to
the scheduler or have to overwrite the
- // original job properties with the reminder event schedule. Thus, we
differentiate the job and trigger key from the
- // original event.
- JobKey newJobKey = new JobKey(origJobKey.getName() +
createSuffixForJobTrigger(status.getEventTimeMillis()),
- origJobKey.getGroup());
try {
if (!this.schedulerService.getScheduler().checkExists(origJobKey)) {
- log.warn("Attempting to set a reminder for a job that does not exist
in the scheduler. Key: {}", origJobKey);
+ log.warn("Skipping setting a reminder for a job that does not exist in
the scheduler. Key: {}", origJobKey);
this.jobDoesNotExistInSchedulerCount.inc();
return;
}
- JobDetailImpl jobDetail = (JobDetailImpl)
updatePropsInJobDetail(origJobKey, cronExpression,
+ JobKey reminderJobKey = constructReminderJobKey(origJobKey,
status.getEventTimeMillis());
+ JobDetailImpl jobDetail = createJobDetailForReminderEvent(origJobKey,
reminderJobKey, cronExpression,
status.getEventTimeMillis(), originalEventTimeMillis);
// Create a new trigger that is set to fire at the minimum reminder wait
time calculated
- Trigger trigger = JobScheduler.createTriggerForJob(newJobKey,
+ Trigger reminderTrigger =
JobScheduler.createTriggerForJob(reminderJobKey,
(Properties)
jobDetail.getJobDataMap().get(GobblinServiceJobScheduler.PROPERTIES_KEY),
Optional.absent());
- log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] - attempting
to schedule reminder for event {} in {} "
- + "millis", flowAction, originalEventTimeMillis,
status.getEventTimeMillis(), trigger.getNextFireTime());
- this.schedulerService.getScheduler().scheduleJob(jobDetail, trigger);
+ // TODO: remove this comment once we've confirmed this function works
+ log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] - attempting
to schedule reminder for event {}",
+ flowAction, originalEventTimeMillis, status.getEventTimeMillis());
+ this.schedulerService.getScheduler().scheduleJob(jobDetail,
reminderTrigger);
log.info("Flow Trigger Handler - [{}, eventTimestamp: {}] - SCHEDULED
REMINDER for event {} in {} millis",
- flowAction, originalEventTimeMillis, status.getEventTimeMillis(),
trigger.getNextFireTime());
+ flowAction, originalEventTimeMillis, status.getEventTimeMillis(),
reminderTrigger.getNextFireTime());
} catch (SchedulerException e) {
log.warn("Failed to add job reminder due to SchedulerException for job
{} trigger event {}. Exception: {}",
origJobKey, status.getEventTimeMillis(), e);
@@ -207,18 +201,35 @@ public class FlowTriggerHandler {
}
/**
- * Helper function used to extract JobDetail for job identified by the key
and update the Properties map to contain
- * the cron scheduler for the reminder event and information about the event
to revisit
- * @param key
+ * This function constructs the JobKey for a reminder event given the
original JobKey. Although multiple triggers can
+ * be created for one job, they are required to maintain the same jobProps
for all triggers. This does not allow us
+ * to keep track of additional properties needed for reminder events, so we
create a unique job and must differentiate
+ * the keys from the main job.
+ * @param originalJobKey
+ * @param eventToRevisitMillis
+ * @return
+ */
+ protected JobKey constructReminderJobKey(JobKey originalJobKey, long
eventToRevisitMillis) {
+ return new JobKey(originalJobKey.getName() +
createSuffixForJobTrigger(eventToRevisitMillis),
+ originalJobKey.getGroup());
+ }
+
+ /**
+ * Helper function used to extract JobDetail for job identified by the
originalKey and update it be associated with
+ * the event to revisit. It will update the jobKey to the reminderKey
provides and the Properties map to
+ * contain the cron scheduler for the reminder event and information about
the event to revisit
+ * @param originalKey
+ * @param reminderKey
* @param cronExpression
* @param reminderTimestampMillis
* @param originalEventTimeMillis
* @return
* @throws SchedulerException
*/
- protected JobDetail updatePropsInJobDetail(JobKey key, String
cronExpression, long reminderTimestampMillis,
- long originalEventTimeMillis) throws SchedulerException {
- JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(key);
+ protected JobDetailImpl createJobDetailForReminderEvent(JobKey originalKey,
JobKey reminderKey,
+ String cronExpression, long reminderTimestampMillis, long
originalEventTimeMillis) throws SchedulerException {
+ JobDetailImpl jobDetail = (JobDetailImpl)
this.schedulerService.getScheduler().getJobDetail(originalKey);
+ jobDetail.setKey(reminderKey);
JobDataMap jobDataMap = jobDetail.getJobDataMap();
jobDataMap = updatePropsInJobDataMap(jobDataMap, cronExpression,
reminderTimestampMillis, originalEventTimeMillis);
jobDetail.setJobDataMap(jobDataMap);