CAMEL-11632: QuartzScheduledPollConsumerScheduler causes trigger misfires on 
each application start. Make this consumer more similar to the regular quartz 
consumer and how it start.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/281864aa
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/281864aa
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/281864aa

Branch: refs/heads/camel-2.19.x
Commit: 281864aafe318b3b67c71aa1ba9faaa59599534f
Parents: bbe39e0
Author: Claus Ibsen <[email protected]>
Authored: Tue Sep 12 14:42:31 2017 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Tue Sep 12 14:43:20 2017 +0200

----------------------------------------------------------------------
 .../QuartzScheduledPollConsumerScheduler.java   | 59 +++++++++++++++++--
 .../FileConsumerQuartzSchedulerRestartTest.java | 60 ++++++++++++++++++++
 2 files changed, 114 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/281864aa/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
----------------------------------------------------------------------
diff --git 
a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
 
b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
index 6213903..02a813d 100644
--- 
a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
+++ 
b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java
@@ -33,8 +33,10 @@ import org.quartz.CronTrigger;
 import org.quartz.JobBuilder;
 import org.quartz.JobDataMap;
 import org.quartz.JobDetail;
+import org.quartz.ObjectAlreadyExistsException;
 import org.quartz.Scheduler;
 import org.quartz.SchedulerException;
+import org.quartz.SimpleTrigger;
 import org.quartz.Trigger;
 import org.quartz.TriggerBuilder;
 import org.quartz.TriggerKey;
@@ -193,7 +195,9 @@ public class QuartzScheduledPollConsumerScheduler extends 
ServiceSupport impleme
             // store additional information on job such as camel context etc
             QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
 
-            trigger = TriggerBuilder.newTrigger().withIdentity(id, 
triggerGroup).withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone())).build();
+            trigger = TriggerBuilder.newTrigger().withIdentity(id, 
triggerGroup)
+                
.withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone()))
+                .build();
 
             LOG.debug("Scheduling job: {} with trigger: {}", job, 
trigger.getKey());
             quartzScheduler.scheduleJob(job, trigger);
@@ -206,13 +210,43 @@ public class QuartzScheduledPollConsumerScheduler extends 
ServiceSupport impleme
             jobData.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, 
getCron());
             jobData.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, 
getTimeZone().getID());
 
+            // store additional information on job such as camel context etc
             QuartzHelper.updateJobDataMap(getCamelContext(), job, null);
             LOG.debug("Updated jobData map to {}", jobData);
-            
-            // Ensure the cron schedule is updated
-            CronTrigger newTrigger = 
existingTrigger.getTriggerBuilder().withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone())).build();
 
-            quartzScheduler.rescheduleJob(triggerKey, newTrigger);
+            trigger = existingTrigger.getTriggerBuilder()
+                
.withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone()))
+                .build();
+
+            // Reschedule job if trigger settings were changed
+            if (hasTriggerChanged(existingTrigger, trigger)) {
+                LOG.debug("Re-scheduling job: {} with trigger: {}", job, 
trigger.getKey());
+                quartzScheduler.rescheduleJob(triggerKey, trigger);
+            } else {
+                // Schedule it now. Remember that scheduler might not be 
started it, but we can schedule now.
+                LOG.debug("Scheduling job: {} with trigger: {}", job, 
trigger.getKey());
+                try {
+                    // Schedule it now. Remember that scheduler might not be 
started it, but we can schedule now.
+                    quartzScheduler.scheduleJob(job, trigger);
+                } catch (ObjectAlreadyExistsException ex) {
+                    // some other VM might may have stored the job & trigger 
in DB in clustered mode, in the mean time
+                    QuartzComponent quartz = 
getCamelContext().getComponent("quartz2", QuartzComponent.class);
+                    if (!(quartz.isClustered())) {
+                        throw ex;
+                    } else {
+                        trigger = (CronTrigger) 
quartzScheduler.getTrigger(triggerKey);
+                        if (trigger == null) {
+                            throw new SchedulerException("Trigger could not be 
found in quartz scheduler.");
+                        }
+                    }
+                }
+            }
+        }
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Job {} (triggerType={}, jobClass={}) is scheduled. Next 
fire date is {}",
+                new Object[] {trigger.getKey(), 
trigger.getClass().getSimpleName(),
+                    job.getJobClass().getSimpleName(), 
trigger.getNextFireTime()});
         }
     }
 
@@ -236,4 +270,19 @@ public class QuartzScheduledPollConsumerScheduler extends 
ServiceSupport impleme
         }
     }
 
+    private boolean hasTriggerChanged(Trigger oldTrigger, Trigger newTrigger) {
+        if (newTrigger instanceof CronTrigger && oldTrigger instanceof 
CronTrigger) {
+            CronTrigger newCron = (CronTrigger) newTrigger;
+            CronTrigger oldCron = (CronTrigger) oldTrigger;
+            return 
!newCron.getCronExpression().equals(oldCron.getCronExpression());
+        } else if (newTrigger instanceof SimpleTrigger && oldTrigger 
instanceof SimpleTrigger) {
+            SimpleTrigger newSimple = (SimpleTrigger) newTrigger;
+            SimpleTrigger oldSimple = (SimpleTrigger) oldTrigger;
+            return newSimple.getRepeatInterval() != 
oldSimple.getRepeatInterval()
+                || newSimple.getRepeatCount() != oldSimple.getRepeatCount();
+        } else {
+            return !newTrigger.getClass().equals(oldTrigger.getClass()) || 
!newTrigger.equals(oldTrigger);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/281864aa/components/camel-quartz2/src/test/java/org/apache/camel/pollconsumer/quartz2/FileConsumerQuartzSchedulerRestartTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-quartz2/src/test/java/org/apache/camel/pollconsumer/quartz2/FileConsumerQuartzSchedulerRestartTest.java
 
b/components/camel-quartz2/src/test/java/org/apache/camel/pollconsumer/quartz2/FileConsumerQuartzSchedulerRestartTest.java
new file mode 100644
index 0000000..7d1e991
--- /dev/null
+++ 
b/components/camel-quartz2/src/test/java/org/apache/camel/pollconsumer/quartz2/FileConsumerQuartzSchedulerRestartTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.pollconsumer.quartz2;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class FileConsumerQuartzSchedulerRestartTest extends CamelTestSupport {
+
+    @Override
+    public void setUp() throws Exception {
+        deleteDirectory("target/file/quartz");
+        super.setUp();
+    }
+
+    @Test
+    public void testQuartzSchedulerRestart() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        template.sendBodyAndHeader("file:target/file/quartz", "Hello World", 
Exchange.FILE_NAME, "hello.txt");
+        context.startRoute("foo");
+        assertMockEndpointsSatisfied();
+
+        context.stopRoute("foo");
+        resetMocks();
+
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        template.sendBodyAndHeader("file:target/file/quartz", "Bye World", 
Exchange.FILE_NAME, "bye.txt");
+        context.startRoute("foo");
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                
from("file:target/file/quartz?scheduler=quartz2&scheduler.cron=0/2+*+*+*+*+?&scheduler.triggerGroup=myGroup&scheduler.triggerId=myId")
+                    .routeId("foo").noAutoStartup()
+                        .to("mock:result");
+            }
+        };
+    }
+
+}

Reply via email to