CAMEL-11632: QuartzScheduledPollConsumerScheduler causes trigger misfires on each application start. Make this consumer more similar to the regular quartz consumer and how it start.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/281864aa Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/281864aa Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/281864aa Branch: refs/heads/camel-2.19.x Commit: 281864aafe318b3b67c71aa1ba9faaa59599534f Parents: bbe39e0 Author: Claus Ibsen <[email protected]> Authored: Tue Sep 12 14:42:31 2017 +0200 Committer: Claus Ibsen <[email protected]> Committed: Tue Sep 12 14:43:20 2017 +0200 ---------------------------------------------------------------------- .../QuartzScheduledPollConsumerScheduler.java | 59 +++++++++++++++++-- .../FileConsumerQuartzSchedulerRestartTest.java | 60 ++++++++++++++++++++ 2 files changed, 114 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/281864aa/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java index 6213903..02a813d 100644 --- a/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java +++ b/components/camel-quartz2/src/main/java/org/apache/camel/pollconsumer/quartz2/QuartzScheduledPollConsumerScheduler.java @@ -33,8 +33,10 @@ import org.quartz.CronTrigger; import org.quartz.JobBuilder; import org.quartz.JobDataMap; import org.quartz.JobDetail; +import org.quartz.ObjectAlreadyExistsException; import org.quartz.Scheduler; import org.quartz.SchedulerException; +import org.quartz.SimpleTrigger; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; @@ -193,7 +195,9 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme // store additional information on job such as camel context etc QuartzHelper.updateJobDataMap(getCamelContext(), job, null); - trigger = TriggerBuilder.newTrigger().withIdentity(id, triggerGroup).withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone())).build(); + trigger = TriggerBuilder.newTrigger().withIdentity(id, triggerGroup) + .withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone())) + .build(); LOG.debug("Scheduling job: {} with trigger: {}", job, trigger.getKey()); quartzScheduler.scheduleJob(job, trigger); @@ -206,13 +210,43 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme jobData.put(QuartzConstants.QUARTZ_TRIGGER_CRON_EXPRESSION, getCron()); jobData.put(QuartzConstants.QUARTZ_TRIGGER_CRON_TIMEZONE, getTimeZone().getID()); + // store additional information on job such as camel context etc QuartzHelper.updateJobDataMap(getCamelContext(), job, null); LOG.debug("Updated jobData map to {}", jobData); - - // Ensure the cron schedule is updated - CronTrigger newTrigger = existingTrigger.getTriggerBuilder().withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone())).build(); - quartzScheduler.rescheduleJob(triggerKey, newTrigger); + trigger = existingTrigger.getTriggerBuilder() + .withSchedule(CronScheduleBuilder.cronSchedule(getCron()).inTimeZone(getTimeZone())) + .build(); + + // Reschedule job if trigger settings were changed + if (hasTriggerChanged(existingTrigger, trigger)) { + LOG.debug("Re-scheduling job: {} with trigger: {}", job, trigger.getKey()); + quartzScheduler.rescheduleJob(triggerKey, trigger); + } else { + // Schedule it now. Remember that scheduler might not be started it, but we can schedule now. + LOG.debug("Scheduling job: {} with trigger: {}", job, trigger.getKey()); + try { + // Schedule it now. Remember that scheduler might not be started it, but we can schedule now. + quartzScheduler.scheduleJob(job, trigger); + } catch (ObjectAlreadyExistsException ex) { + // some other VM might may have stored the job & trigger in DB in clustered mode, in the mean time + QuartzComponent quartz = getCamelContext().getComponent("quartz2", QuartzComponent.class); + if (!(quartz.isClustered())) { + throw ex; + } else { + trigger = (CronTrigger) quartzScheduler.getTrigger(triggerKey); + if (trigger == null) { + throw new SchedulerException("Trigger could not be found in quartz scheduler."); + } + } + } + } + } + + if (LOG.isInfoEnabled()) { + LOG.info("Job {} (triggerType={}, jobClass={}) is scheduled. Next fire date is {}", + new Object[] {trigger.getKey(), trigger.getClass().getSimpleName(), + job.getJobClass().getSimpleName(), trigger.getNextFireTime()}); } } @@ -236,4 +270,19 @@ public class QuartzScheduledPollConsumerScheduler extends ServiceSupport impleme } } + private boolean hasTriggerChanged(Trigger oldTrigger, Trigger newTrigger) { + if (newTrigger instanceof CronTrigger && oldTrigger instanceof CronTrigger) { + CronTrigger newCron = (CronTrigger) newTrigger; + CronTrigger oldCron = (CronTrigger) oldTrigger; + return !newCron.getCronExpression().equals(oldCron.getCronExpression()); + } else if (newTrigger instanceof SimpleTrigger && oldTrigger instanceof SimpleTrigger) { + SimpleTrigger newSimple = (SimpleTrigger) newTrigger; + SimpleTrigger oldSimple = (SimpleTrigger) oldTrigger; + return newSimple.getRepeatInterval() != oldSimple.getRepeatInterval() + || newSimple.getRepeatCount() != oldSimple.getRepeatCount(); + } else { + return !newTrigger.getClass().equals(oldTrigger.getClass()) || !newTrigger.equals(oldTrigger); + } + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/281864aa/components/camel-quartz2/src/test/java/org/apache/camel/pollconsumer/quartz2/FileConsumerQuartzSchedulerRestartTest.java ---------------------------------------------------------------------- diff --git a/components/camel-quartz2/src/test/java/org/apache/camel/pollconsumer/quartz2/FileConsumerQuartzSchedulerRestartTest.java b/components/camel-quartz2/src/test/java/org/apache/camel/pollconsumer/quartz2/FileConsumerQuartzSchedulerRestartTest.java new file mode 100644 index 0000000..7d1e991 --- /dev/null +++ b/components/camel-quartz2/src/test/java/org/apache/camel/pollconsumer/quartz2/FileConsumerQuartzSchedulerRestartTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.pollconsumer.quartz2; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class FileConsumerQuartzSchedulerRestartTest extends CamelTestSupport { + + @Override + public void setUp() throws Exception { + deleteDirectory("target/file/quartz"); + super.setUp(); + } + + @Test + public void testQuartzSchedulerRestart() throws Exception { + getMockEndpoint("mock:result").expectedMessageCount(1); + template.sendBodyAndHeader("file:target/file/quartz", "Hello World", Exchange.FILE_NAME, "hello.txt"); + context.startRoute("foo"); + assertMockEndpointsSatisfied(); + + context.stopRoute("foo"); + resetMocks(); + + getMockEndpoint("mock:result").expectedMessageCount(1); + template.sendBodyAndHeader("file:target/file/quartz", "Bye World", Exchange.FILE_NAME, "bye.txt"); + context.startRoute("foo"); + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("file:target/file/quartz?scheduler=quartz2&scheduler.cron=0/2+*+*+*+*+?&scheduler.triggerGroup=myGroup&scheduler.triggerId=myId") + .routeId("foo").noAutoStartup() + .to("mock:result"); + } + }; + } + +}
