Repository: deltaspike Updated Branches: refs/heads/master 2e4106e05 -> f2e4dec80
DELTASPIKE-1081 support for configurable cron-expressions Project: http://git-wip-us.apache.org/repos/asf/deltaspike/repo Commit: http://git-wip-us.apache.org/repos/asf/deltaspike/commit/f2e4dec8 Tree: http://git-wip-us.apache.org/repos/asf/deltaspike/tree/f2e4dec8 Diff: http://git-wip-us.apache.org/repos/asf/deltaspike/diff/f2e4dec8 Branch: refs/heads/master Commit: f2e4dec801dd4ef68fc96ddd7b55fccb1d03459d Parents: 2e4106e Author: gpetracek <gpetra...@apache.org> Authored: Sun Feb 21 21:47:19 2016 +0100 Committer: gpetracek <gpetra...@apache.org> Committed: Sun Feb 21 21:47:49 2016 +0100 ---------------------------------------------------------------------- .../scheduler/impl/AbstractQuartzScheduler.java | 104 ++++++++++++++++--- .../impl/DynamicExpressionObserverJob.java | 103 ++++++++++++++++++ .../scheduler/impl/SchedulerBaseConfig.java | 8 ++ .../scheduler/impl/SchedulerExtension.java | 15 ++- 4 files changed, 213 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/deltaspike/blob/f2e4dec8/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/AbstractQuartzScheduler.java ---------------------------------------------------------------------- diff --git a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/AbstractQuartzScheduler.java b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/AbstractQuartzScheduler.java index bd85f93..f668b79 100644 --- a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/AbstractQuartzScheduler.java +++ b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/AbstractQuartzScheduler.java @@ -19,8 +19,10 @@ package org.apache.deltaspike.scheduler.impl; import org.apache.deltaspike.cdise.api.ContextControl; +import org.apache.deltaspike.core.api.config.ConfigResolver; import org.apache.deltaspike.core.api.provider.BeanProvider; import org.apache.deltaspike.core.api.provider.DependentProvider; +import org.apache.deltaspike.core.util.ClassDeactivationUtils; import org.apache.deltaspike.core.util.ClassUtils; import org.apache.deltaspike.core.util.ExceptionUtils; import org.apache.deltaspike.core.util.PropertyFileUtils; @@ -44,12 +46,14 @@ import org.quartz.impl.StdSchedulerFactory; import java.io.InputStream; import java.lang.annotation.Annotation; +import java.util.Arrays; import java.util.Collections; import java.util.Enumeration; import java.util.List; import java.util.Properties; import java.util.ResourceBundle; import java.util.Stack; +import java.util.UUID; import java.util.logging.Level; import java.util.logging.Logger; @@ -191,11 +195,7 @@ public abstract class AbstractQuartzScheduler<T> implements Scheduler<T> .withIdentity(jobKey) .build(); - trigger = TriggerBuilder.newTrigger() - .withSchedule(CronScheduleBuilder.cronSchedule(scheduled.cronExpression())) - .build(); - - this.scheduler.scheduleJob(jobDetail, trigger); + scheduleNewJob(scheduled, jobKey, jobDetail); } else if (scheduled.overrideOnStartup()) { @@ -203,12 +203,7 @@ public abstract class AbstractQuartzScheduler<T> implements Scheduler<T> if (existingTriggers == null || existingTriggers.isEmpty()) { - //TODO re-visit it - trigger = TriggerBuilder.newTrigger() - .withSchedule(CronScheduleBuilder.cronSchedule(scheduled.cronExpression())) - .build(); - - this.scheduler.scheduleJob(jobDetail, trigger); + scheduleNewJob(scheduled, jobKey, jobDetail); return; } @@ -220,12 +215,21 @@ public abstract class AbstractQuartzScheduler<T> implements Scheduler<T> trigger = existingTriggers.iterator().next(); - trigger = TriggerBuilder.newTrigger() - .withIdentity(trigger.getKey()) - .withSchedule(CronScheduleBuilder.cronSchedule(scheduled.cronExpression())) - .build(); + if (scheduled.cronExpression().startsWith("{") && scheduled.cronExpression().endsWith("}")) + { + this.scheduler.unscheduleJobs(Arrays.asList(trigger.getKey())); + + scheduleNewJob(scheduled, jobKey, jobDetail); + } + else + { + trigger = TriggerBuilder.newTrigger() + .withIdentity(trigger.getKey()) + .withSchedule(CronScheduleBuilder.cronSchedule(scheduled.cronExpression())) + .build(); - this.scheduler.rescheduleJob(trigger.getKey(), trigger); + this.scheduler.rescheduleJob(trigger.getKey(), trigger); + } } else { @@ -239,6 +243,74 @@ public abstract class AbstractQuartzScheduler<T> implements Scheduler<T> } } + private void scheduleNewJob(Scheduled scheduled, JobKey jobKey, JobDetail jobDetail) throws SchedulerException + { + String cronExpression = evaluateExpression(scheduled); + this.scheduler.scheduleJob(jobDetail, createTrigger(scheduled, jobKey, cronExpression)); + } + + private Trigger createTrigger(Scheduled scheduled, JobKey jobKey, String cronExpression) throws SchedulerException + { + UUID triggerKey = UUID.randomUUID(); + + if (!scheduled.cronExpression().endsWith(cronExpression)) + { + createExpressionObserverJob(jobKey, triggerKey, scheduled.cronExpression(), cronExpression); + } + + Trigger trigger = TriggerBuilder.newTrigger() + .forJob(jobKey) + .withIdentity(triggerKey.toString()) + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) + .build(); + return trigger; + } + + private void createExpressionObserverJob( + JobKey jobKey, UUID triggerKey, String configExpression, String cronExpression) throws SchedulerException + { + if (!ClassDeactivationUtils.isActivated(DynamicExpressionObserverJob.class)) + { + return; + } + + JobKey observerJobKey = + new JobKey(jobKey.getName() + DynamicExpressionObserverJob.OBSERVER_POSTFIX, jobKey.getGroup()); + + JobDetail jobDetail = JobBuilder.newJob(DynamicExpressionObserverJob.class) + .usingJobData(DynamicExpressionObserverJob.CONFIG_EXPRESSION_KEY, configExpression) + .usingJobData(DynamicExpressionObserverJob.TRIGGER_ID_KEY, triggerKey.toString()) + .usingJobData(DynamicExpressionObserverJob.ACTIVE_CRON_EXPRESSION_KEY, cronExpression) + .withDescription("Config observer for: " + jobKey) + .withIdentity(observerJobKey) + .build(); + + Trigger trigger = TriggerBuilder.newTrigger() + .forJob(observerJobKey) + .withSchedule(CronScheduleBuilder.cronSchedule( + SchedulerBaseConfig.JobCustomization.DYNAMIC_EXPRESSION_OBSERVER_INTERVAL)) + .build(); + + this.scheduler.scheduleJob(jobDetail, trigger); + } + + private String evaluateExpression(Scheduled scheduled) + { + String expression = scheduled.cronExpression(); + + if (expression.startsWith("{") && expression.endsWith("}")) + { + String configKey = expression.substring(1, expression.length() - 1); + expression = ConfigResolver.getProjectStageAwarePropertyValue(configKey, null); + + if (expression == null) + { + throw new IllegalStateException("No config-value found for config-key: " + configKey); + } + } + return expression; + } + protected abstract Class<? extends Job> createFinalJobClass(Class<? extends T> jobClass); @Override http://git-wip-us.apache.org/repos/asf/deltaspike/blob/f2e4dec8/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/DynamicExpressionObserverJob.java ---------------------------------------------------------------------- diff --git a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/DynamicExpressionObserverJob.java b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/DynamicExpressionObserverJob.java new file mode 100644 index 0000000..bac885c --- /dev/null +++ b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/DynamicExpressionObserverJob.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.deltaspike.scheduler.impl; + +import org.apache.deltaspike.core.api.config.ConfigResolver; +import org.apache.deltaspike.core.api.provider.BeanProvider; +import org.apache.deltaspike.core.spi.activation.Deactivatable; +import org.apache.deltaspike.scheduler.spi.Scheduler; +import org.quartz.CronScheduleBuilder; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobDataMap; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.JobKey; +import org.quartz.PersistJobDataAfterExecution; +import org.quartz.SchedulerException; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; + +import javax.inject.Inject; +import java.util.logging.Logger; + +/** + * This job is only active, if configurable cron-expressions are used - e.g.: @Scheduled(cronExpression = "{myKey}"). + * It observes jobs with configurable cron-expressions and updates their job-triggers once a config-change was detected. + * Per default this job gets executed once per minute. That can be changed via config-entry: + * deltaspike.scheduler.dynamic-expression.observer-interval=[any valid cron-expression] + */ +@DisallowConcurrentExecution +@PersistJobDataAfterExecution +public class DynamicExpressionObserverJob implements Deactivatable, Job +{ + static final String CONFIG_EXPRESSION_KEY = "ds_configExpression"; + static final String ACTIVE_CRON_EXPRESSION_KEY = "ds_activeCronExpression"; + static final String TRIGGER_ID_KEY = "ds_triggerKey"; + static final String OBSERVER_POSTFIX = "_observer"; + + private static final Logger LOG = Logger.getLogger(DynamicExpressionObserverJob.class.getName()); + + @Inject + private Scheduler<Job> scheduler; + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException + { + JobDataMap jobDataMap = context.getMergedJobDataMap(); + String configExpression = jobDataMap.getString(CONFIG_EXPRESSION_KEY); + String triggerId = jobDataMap.getString(TRIGGER_ID_KEY); + String activeCronExpression = jobDataMap.getString(ACTIVE_CRON_EXPRESSION_KEY); + + String configKey = configExpression.substring(1, configExpression.length() - 1); + String configuredValue = ConfigResolver.getPropertyAwarePropertyValue(configKey, activeCronExpression); + + if (!activeCronExpression.equals(configuredValue)) + { + //both #put calls are needed currently + context.getJobDetail().getJobDataMap().put(ACTIVE_CRON_EXPRESSION_KEY, configuredValue); + context.getTrigger().getJobDataMap().put(ACTIVE_CRON_EXPRESSION_KEY, configuredValue); + + BeanProvider.injectFields(this); + + JobKey observerJobKey = context.getJobDetail().getKey(); + String observedJobName = observerJobKey.getName() + .substring(0, observerJobKey.getName().length() - OBSERVER_POSTFIX.length()); + JobKey observedJobKey = new JobKey(observedJobName, observerJobKey.getGroup()); + + Trigger trigger = TriggerBuilder.newTrigger() + .withIdentity(triggerId) + .forJob(observedJobName, observedJobKey.getGroup()) + .withSchedule(CronScheduleBuilder.cronSchedule(configuredValue)) + .build(); + + //use rescheduleJob instead of delete + add + //(unwrap is ok here, because this class will only get active in case of a quartz-scheduler) + org.quartz.Scheduler quartzScheduler = scheduler.unwrap(org.quartz.Scheduler.class); + try + { + quartzScheduler.rescheduleJob(trigger.getKey(), trigger); + } + catch (SchedulerException e) + { + LOG.warning("failed to updated cron-expression for " + observedJobKey); + } + } + } +} http://git-wip-us.apache.org/repos/asf/deltaspike/blob/f2e4dec8/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerBaseConfig.java ---------------------------------------------------------------------- diff --git a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerBaseConfig.java b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerBaseConfig.java index 225891e..127020e 100644 --- a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerBaseConfig.java +++ b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerBaseConfig.java @@ -28,6 +28,8 @@ public interface SchedulerBaseConfig extends DeltaSpikeBaseConfig { String JOB_CLASS_NAME_KEY = "deltaspike.scheduler.job-class"; String RUNNABLE_ADAPTER_CLASS_NAME_KEY = "deltaspike.scheduler.runnable-adapter-class"; + String DYNAMIC_EXPRESSION_OBSERVER_INTERVAL_KEY = + "deltaspike.scheduler.dynamic-expression.observer-interval"; //don't type it to class to keep quartz optional String DEFAULT_JOB_FACTORY_CLASS_NAME = ConfigResolver.resolve("deltaspike.scheduler.DefaultJobFactory") @@ -46,6 +48,12 @@ public interface SchedulerBaseConfig extends DeltaSpikeBaseConfig .withCurrentProjectStage(true) .withDefault("org.apache.deltaspike.scheduler.impl.JobRunnableAdapter") .getValue(); + + String DYNAMIC_EXPRESSION_OBSERVER_INTERVAL = + ConfigResolver.resolve(DYNAMIC_EXPRESSION_OBSERVER_INTERVAL_KEY) + .withCurrentProjectStage(true) + .withDefault("0 0/1 * * * ?") + .getValue(); } String SCHEDULER_CONFIG_FILE = ConfigResolver.resolve("deltaspike.scheduler.quartz_config-file") http://git-wip-us.apache.org/repos/asf/deltaspike/blob/f2e4dec8/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerExtension.java ---------------------------------------------------------------------- diff --git a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerExtension.java b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerExtension.java index 33a7489..1780373 100644 --- a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerExtension.java +++ b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerExtension.java @@ -45,6 +45,9 @@ public class SchedulerExtension implements Extension, Deactivatable { private static final Logger LOG = Logger.getLogger(SchedulerExtension.class.getName()); + //keep it as a string (needed by some containers - due to the imports) + private static Set<String> classNamesToVeto = new HashSet<String>(); + private Boolean isActivated = true; private Set<Class> foundManagedJobClasses = new HashSet<Class>(); @@ -53,6 +56,11 @@ public class SchedulerExtension implements Extension, Deactivatable private Class jobClass; + public SchedulerExtension() + { + classNamesToVeto.add("org.apache.deltaspike.scheduler.impl.DynamicExpressionObserverJob"); + } + protected void init(@Observes BeforeBeanDiscovery beforeBeanDiscovery) { this.isActivated = ClassDeactivationUtils.isActivated(getClass()); @@ -80,7 +88,7 @@ public class SchedulerExtension implements Extension, Deactivatable Class<X> beanClass = pat.getAnnotatedType().getJavaClass(); //see SchedulerProducer - if (Scheduler.class.isAssignableFrom(beanClass)) + if (Scheduler.class.isAssignableFrom(beanClass) || isInternalUnmanagedClass(beanClass)) { pat.veto(); return; @@ -98,6 +106,11 @@ public class SchedulerExtension implements Extension, Deactivatable } } + private <X> boolean isInternalUnmanagedClass(Class<X> beanClass) + { + return classNamesToVeto.contains(beanClass.getName()); + } + public <X> void scheduleJobs(@Observes AfterBeanDiscovery afterBeanDiscovery, BeanManager beanManager) { if (!this.isActivated)