Repository: deltaspike
Updated Branches:
  refs/heads/master 2e4106e05 -> f2e4dec80


DELTASPIKE-1081 support for configurable cron-expressions


Project: http://git-wip-us.apache.org/repos/asf/deltaspike/repo
Commit: http://git-wip-us.apache.org/repos/asf/deltaspike/commit/f2e4dec8
Tree: http://git-wip-us.apache.org/repos/asf/deltaspike/tree/f2e4dec8
Diff: http://git-wip-us.apache.org/repos/asf/deltaspike/diff/f2e4dec8

Branch: refs/heads/master
Commit: f2e4dec801dd4ef68fc96ddd7b55fccb1d03459d
Parents: 2e4106e
Author: gpetracek <gpetra...@apache.org>
Authored: Sun Feb 21 21:47:19 2016 +0100
Committer: gpetracek <gpetra...@apache.org>
Committed: Sun Feb 21 21:47:49 2016 +0100

----------------------------------------------------------------------
 .../scheduler/impl/AbstractQuartzScheduler.java | 104 ++++++++++++++++---
 .../impl/DynamicExpressionObserverJob.java      | 103 ++++++++++++++++++
 .../scheduler/impl/SchedulerBaseConfig.java     |   8 ++
 .../scheduler/impl/SchedulerExtension.java      |  15 ++-
 4 files changed, 213 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/deltaspike/blob/f2e4dec8/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/AbstractQuartzScheduler.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/AbstractQuartzScheduler.java
 
b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/AbstractQuartzScheduler.java
index bd85f93..f668b79 100644
--- 
a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/AbstractQuartzScheduler.java
+++ 
b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/AbstractQuartzScheduler.java
@@ -19,8 +19,10 @@
 package org.apache.deltaspike.scheduler.impl;
 
 import org.apache.deltaspike.cdise.api.ContextControl;
+import org.apache.deltaspike.core.api.config.ConfigResolver;
 import org.apache.deltaspike.core.api.provider.BeanProvider;
 import org.apache.deltaspike.core.api.provider.DependentProvider;
+import org.apache.deltaspike.core.util.ClassDeactivationUtils;
 import org.apache.deltaspike.core.util.ClassUtils;
 import org.apache.deltaspike.core.util.ExceptionUtils;
 import org.apache.deltaspike.core.util.PropertyFileUtils;
@@ -44,12 +46,14 @@ import org.quartz.impl.StdSchedulerFactory;
 
 import java.io.InputStream;
 import java.lang.annotation.Annotation;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.List;
 import java.util.Properties;
 import java.util.ResourceBundle;
 import java.util.Stack;
+import java.util.UUID;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -191,11 +195,7 @@ public abstract class AbstractQuartzScheduler<T> 
implements Scheduler<T>
                         .withIdentity(jobKey)
                         .build();
 
-                trigger = TriggerBuilder.newTrigger()
-                        
.withSchedule(CronScheduleBuilder.cronSchedule(scheduled.cronExpression()))
-                        .build();
-
-                this.scheduler.scheduleJob(jobDetail, trigger);
+                scheduleNewJob(scheduled, jobKey, jobDetail);
             }
             else if (scheduled.overrideOnStartup())
             {
@@ -203,12 +203,7 @@ public abstract class AbstractQuartzScheduler<T> 
implements Scheduler<T>
 
                 if (existingTriggers == null || existingTriggers.isEmpty())
                 {
-                    //TODO re-visit it
-                    trigger = TriggerBuilder.newTrigger()
-                            
.withSchedule(CronScheduleBuilder.cronSchedule(scheduled.cronExpression()))
-                            .build();
-
-                    this.scheduler.scheduleJob(jobDetail, trigger);
+                    scheduleNewJob(scheduled, jobKey, jobDetail);
                     return;
                 }
 
@@ -220,12 +215,21 @@ public abstract class AbstractQuartzScheduler<T> 
implements Scheduler<T>
 
                 trigger = existingTriggers.iterator().next();
 
-                trigger = TriggerBuilder.newTrigger()
-                        .withIdentity(trigger.getKey())
-                        
.withSchedule(CronScheduleBuilder.cronSchedule(scheduled.cronExpression()))
-                        .build();
+                if (scheduled.cronExpression().startsWith("{") && 
scheduled.cronExpression().endsWith("}"))
+                {
+                    
this.scheduler.unscheduleJobs(Arrays.asList(trigger.getKey()));
+
+                    scheduleNewJob(scheduled, jobKey, jobDetail);
+                }
+                else
+                {
+                    trigger = TriggerBuilder.newTrigger()
+                            .withIdentity(trigger.getKey())
+                            
.withSchedule(CronScheduleBuilder.cronSchedule(scheduled.cronExpression()))
+                            .build();
 
-                this.scheduler.rescheduleJob(trigger.getKey(), trigger);
+                    this.scheduler.rescheduleJob(trigger.getKey(), trigger);
+                }
             }
             else
             {
@@ -239,6 +243,74 @@ public abstract class AbstractQuartzScheduler<T> 
implements Scheduler<T>
         }
     }
 
+    private void scheduleNewJob(Scheduled scheduled, JobKey jobKey, JobDetail 
jobDetail) throws SchedulerException
+    {
+        String cronExpression = evaluateExpression(scheduled);
+        this.scheduler.scheduleJob(jobDetail, createTrigger(scheduled, jobKey, 
cronExpression));
+    }
+
+    private Trigger createTrigger(Scheduled scheduled, JobKey jobKey, String 
cronExpression) throws SchedulerException
+    {
+        UUID triggerKey = UUID.randomUUID();
+
+        if (!scheduled.cronExpression().endsWith(cronExpression))
+        {
+            createExpressionObserverJob(jobKey, triggerKey, 
scheduled.cronExpression(), cronExpression);
+        }
+
+        Trigger trigger = TriggerBuilder.newTrigger()
+                .forJob(jobKey)
+                .withIdentity(triggerKey.toString())
+                .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
+                .build();
+        return trigger;
+    }
+
+    private void createExpressionObserverJob(
+        JobKey jobKey, UUID triggerKey, String configExpression, String 
cronExpression) throws SchedulerException
+    {
+        if 
(!ClassDeactivationUtils.isActivated(DynamicExpressionObserverJob.class))
+        {
+            return;
+        }
+
+        JobKey observerJobKey =
+                new JobKey(jobKey.getName() + 
DynamicExpressionObserverJob.OBSERVER_POSTFIX, jobKey.getGroup());
+
+        JobDetail jobDetail  = 
JobBuilder.newJob(DynamicExpressionObserverJob.class)
+                
.usingJobData(DynamicExpressionObserverJob.CONFIG_EXPRESSION_KEY, 
configExpression)
+                .usingJobData(DynamicExpressionObserverJob.TRIGGER_ID_KEY, 
triggerKey.toString())
+                
.usingJobData(DynamicExpressionObserverJob.ACTIVE_CRON_EXPRESSION_KEY, 
cronExpression)
+                .withDescription("Config observer for: " + jobKey)
+                .withIdentity(observerJobKey)
+                .build();
+
+        Trigger trigger = TriggerBuilder.newTrigger()
+                .forJob(observerJobKey)
+                .withSchedule(CronScheduleBuilder.cronSchedule(
+                    
SchedulerBaseConfig.JobCustomization.DYNAMIC_EXPRESSION_OBSERVER_INTERVAL))
+                .build();
+
+        this.scheduler.scheduleJob(jobDetail, trigger);
+    }
+
+    private String evaluateExpression(Scheduled scheduled)
+    {
+        String expression = scheduled.cronExpression();
+
+        if (expression.startsWith("{") && expression.endsWith("}"))
+        {
+            String configKey = expression.substring(1, expression.length() - 
1);
+            expression = 
ConfigResolver.getProjectStageAwarePropertyValue(configKey, null);
+
+            if (expression == null)
+            {
+                throw new IllegalStateException("No config-value found for 
config-key: " + configKey);
+            }
+        }
+        return expression;
+    }
+
     protected abstract Class<? extends Job> createFinalJobClass(Class<? 
extends T> jobClass);
 
     @Override

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/f2e4dec8/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/DynamicExpressionObserverJob.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/DynamicExpressionObserverJob.java
 
b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/DynamicExpressionObserverJob.java
new file mode 100644
index 0000000..bac885c
--- /dev/null
+++ 
b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/DynamicExpressionObserverJob.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.deltaspike.scheduler.impl;
+
+import org.apache.deltaspike.core.api.config.ConfigResolver;
+import org.apache.deltaspike.core.api.provider.BeanProvider;
+import org.apache.deltaspike.core.spi.activation.Deactivatable;
+import org.apache.deltaspike.scheduler.spi.Scheduler;
+import org.quartz.CronScheduleBuilder;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.JobKey;
+import org.quartz.PersistJobDataAfterExecution;
+import org.quartz.SchedulerException;
+import org.quartz.Trigger;
+import org.quartz.TriggerBuilder;
+
+import javax.inject.Inject;
+import java.util.logging.Logger;
+
+/**
+ * This job is only active, if configurable cron-expressions are used - e.g.: 
@Scheduled(cronExpression = "{myKey}").
+ * It observes jobs with configurable cron-expressions and updates their 
job-triggers once a config-change was detected.
+ * Per default this job gets executed once per minute. That can be changed via 
config-entry:
+ * deltaspike.scheduler.dynamic-expression.observer-interval=[any valid 
cron-expression]
+ */
+@DisallowConcurrentExecution
+@PersistJobDataAfterExecution
+public class DynamicExpressionObserverJob implements Deactivatable, Job
+{
+    static final String CONFIG_EXPRESSION_KEY = "ds_configExpression";
+    static final String ACTIVE_CRON_EXPRESSION_KEY = "ds_activeCronExpression";
+    static final String TRIGGER_ID_KEY = "ds_triggerKey";
+    static final String OBSERVER_POSTFIX = "_observer";
+
+    private static final Logger LOG = 
Logger.getLogger(DynamicExpressionObserverJob.class.getName());
+
+    @Inject
+    private Scheduler<Job> scheduler;
+
+    @Override
+    public void execute(JobExecutionContext context) throws 
JobExecutionException
+    {
+        JobDataMap jobDataMap = context.getMergedJobDataMap();
+        String configExpression = jobDataMap.getString(CONFIG_EXPRESSION_KEY);
+        String triggerId = jobDataMap.getString(TRIGGER_ID_KEY);
+        String activeCronExpression = 
jobDataMap.getString(ACTIVE_CRON_EXPRESSION_KEY);
+
+        String configKey = configExpression.substring(1, 
configExpression.length() - 1);
+        String configuredValue = 
ConfigResolver.getPropertyAwarePropertyValue(configKey, activeCronExpression);
+
+        if (!activeCronExpression.equals(configuredValue))
+        {
+            //both #put calls are needed currently
+            
context.getJobDetail().getJobDataMap().put(ACTIVE_CRON_EXPRESSION_KEY, 
configuredValue);
+            
context.getTrigger().getJobDataMap().put(ACTIVE_CRON_EXPRESSION_KEY, 
configuredValue);
+
+            BeanProvider.injectFields(this);
+
+            JobKey observerJobKey = context.getJobDetail().getKey();
+            String observedJobName = observerJobKey.getName()
+                .substring(0, observerJobKey.getName().length() - 
OBSERVER_POSTFIX.length());
+            JobKey observedJobKey = new JobKey(observedJobName, 
observerJobKey.getGroup());
+
+            Trigger trigger = TriggerBuilder.newTrigger()
+                    .withIdentity(triggerId)
+                    .forJob(observedJobName, observedJobKey.getGroup())
+                    
.withSchedule(CronScheduleBuilder.cronSchedule(configuredValue))
+                    .build();
+
+            //use rescheduleJob instead of delete + add
+            //(unwrap is ok here, because this class will only get active in 
case of a quartz-scheduler)
+            org.quartz.Scheduler quartzScheduler = 
scheduler.unwrap(org.quartz.Scheduler.class);
+            try
+            {
+                quartzScheduler.rescheduleJob(trigger.getKey(), trigger);
+            }
+            catch (SchedulerException e)
+            {
+                LOG.warning("failed to updated cron-expression for " + 
observedJobKey);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/f2e4dec8/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerBaseConfig.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerBaseConfig.java
 
b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerBaseConfig.java
index 225891e..127020e 100644
--- 
a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerBaseConfig.java
+++ 
b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerBaseConfig.java
@@ -28,6 +28,8 @@ public interface SchedulerBaseConfig extends 
DeltaSpikeBaseConfig
     {
         String JOB_CLASS_NAME_KEY = "deltaspike.scheduler.job-class";
         String RUNNABLE_ADAPTER_CLASS_NAME_KEY = 
"deltaspike.scheduler.runnable-adapter-class";
+        String DYNAMIC_EXPRESSION_OBSERVER_INTERVAL_KEY =
+            "deltaspike.scheduler.dynamic-expression.observer-interval";
 
         //don't type it to class to keep quartz optional
         String DEFAULT_JOB_FACTORY_CLASS_NAME = 
ConfigResolver.resolve("deltaspike.scheduler.DefaultJobFactory")
@@ -46,6 +48,12 @@ public interface SchedulerBaseConfig extends 
DeltaSpikeBaseConfig
                 .withCurrentProjectStage(true)
                 
.withDefault("org.apache.deltaspike.scheduler.impl.JobRunnableAdapter")
                 .getValue();
+
+        String DYNAMIC_EXPRESSION_OBSERVER_INTERVAL =
+                
ConfigResolver.resolve(DYNAMIC_EXPRESSION_OBSERVER_INTERVAL_KEY)
+                .withCurrentProjectStage(true)
+                .withDefault("0 0/1 * * * ?")
+                .getValue();
     }
 
     String SCHEDULER_CONFIG_FILE = 
ConfigResolver.resolve("deltaspike.scheduler.quartz_config-file")

http://git-wip-us.apache.org/repos/asf/deltaspike/blob/f2e4dec8/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerExtension.java
----------------------------------------------------------------------
diff --git 
a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerExtension.java
 
b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerExtension.java
index 33a7489..1780373 100644
--- 
a/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerExtension.java
+++ 
b/deltaspike/modules/scheduler/impl/src/main/java/org/apache/deltaspike/scheduler/impl/SchedulerExtension.java
@@ -45,6 +45,9 @@ public class SchedulerExtension implements Extension, 
Deactivatable
 {
     private static final Logger LOG = 
Logger.getLogger(SchedulerExtension.class.getName());
 
+    //keep it as a string (needed by some containers - due to the imports)
+    private static Set<String> classNamesToVeto = new HashSet<String>();
+
     private Boolean isActivated = true;
 
     private Set<Class> foundManagedJobClasses = new HashSet<Class>();
@@ -53,6 +56,11 @@ public class SchedulerExtension implements Extension, 
Deactivatable
 
     private Class jobClass;
 
+    public SchedulerExtension()
+    {
+        
classNamesToVeto.add("org.apache.deltaspike.scheduler.impl.DynamicExpressionObserverJob");
+    }
+
     protected void init(@Observes BeforeBeanDiscovery beforeBeanDiscovery)
     {
         this.isActivated = ClassDeactivationUtils.isActivated(getClass());
@@ -80,7 +88,7 @@ public class SchedulerExtension implements Extension, 
Deactivatable
         Class<X> beanClass = pat.getAnnotatedType().getJavaClass();
 
         //see SchedulerProducer
-        if (Scheduler.class.isAssignableFrom(beanClass))
+        if (Scheduler.class.isAssignableFrom(beanClass) || 
isInternalUnmanagedClass(beanClass))
         {
             pat.veto();
             return;
@@ -98,6 +106,11 @@ public class SchedulerExtension implements Extension, 
Deactivatable
         }
     }
 
+    private <X> boolean isInternalUnmanagedClass(Class<X> beanClass)
+    {
+        return classNamesToVeto.contains(beanClass.getName());
+    }
+
     public <X> void scheduleJobs(@Observes AfterBeanDiscovery 
afterBeanDiscovery, BeanManager beanManager)
     {
         if (!this.isActivated)

Reply via email to