Updated Branches: refs/heads/trunk edc159901 -> d1446c3bc
fix and tests for: https://issues.apache.org/jira/browse/AMQ-4073 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d1446c3b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d1446c3b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d1446c3b Branch: refs/heads/trunk Commit: d1446c3bca26cec7af963c0562b54f95f8e6b43e Parents: edc1599 Author: Timothy Bish <[email protected]> Authored: Thu Sep 5 15:59:01 2013 -0400 Committer: Timothy Bish <[email protected]> Committed: Thu Sep 5 15:59:01 2013 -0400 ---------------------------------------------------------------------- .../store/kahadb/scheduler/JobImpl.java | 18 +- .../store/kahadb/scheduler/JobLocation.java | 98 +++++++++-- .../kahadb/scheduler/JobSchedulerImpl.java | 168 ++++++++++++------- .../JobSchedulerBrokerShutdownTest.java | 77 +++++++++ .../org/apache/activemq/bugs/AMQ3140Test.java | 2 +- 5 files changed, 283 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/d1446c3b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java index ae291a3..86b9fa3 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobImpl.java @@ -20,54 +20,60 @@ import org.apache.activemq.broker.scheduler.Job; import org.apache.activemq.broker.scheduler.JobSupport; import org.apache.activemq.util.ByteSequence; - public class JobImpl implements Job { + private final JobLocation jobLocation; private final byte[] payload; - + protected JobImpl(JobLocation location,ByteSequence bs) { this.jobLocation=location; this.payload = new byte[bs.getLength()]; System.arraycopy(bs.getData(), bs.getOffset(), this.payload, 0, bs.getLength()); } + @Override public String getJobId() { return this.jobLocation.getJobId(); } + @Override public byte[] getPayload() { return this.payload; } + @Override public long getPeriod() { return this.jobLocation.getPeriod(); } + @Override public int getRepeat() { return this.jobLocation.getRepeat(); } + @Override public long getStart() { return this.jobLocation.getStartTime(); } - + + @Override public long getDelay() { return this.jobLocation.getDelay(); } + @Override public String getCronEntry() { return this.jobLocation.getCronEntry(); } - - + @Override public String getNextExecutionTime() { return JobSupport.getDateTime(this.jobLocation.getNextTime()); } + @Override public String getStartTime() { return JobSupport.getDateTime(getStart()); } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/d1446c3b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java index 0acaa7c..13cf376 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobLocation.java @@ -27,7 +27,7 @@ import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; class JobLocation { - + private String jobId; private int repeat; private long startTime; @@ -39,13 +39,12 @@ class JobLocation { public JobLocation(Location location) { this.location = location; - } public JobLocation() { this(new Location()); } - + public void readExternal(DataInput in) throws IOException { this.jobId = in.readUTF(); this.repeat = in.readInt(); @@ -85,7 +84,6 @@ class JobLocation { public void setJobId(String jobId) { this.jobId = jobId; } - /** * @return the repeat @@ -116,7 +114,7 @@ class JobLocation { public void setStartTime(long start) { this.startTime = start; } - + /** * @return the nextTime */ @@ -145,7 +143,7 @@ class JobLocation { public void setPeriod(long period) { this.period = period; } - + /** * @return the cronEntry */ @@ -159,11 +157,14 @@ class JobLocation { public synchronized void setCronEntry(String cronEntry) { this.cronEntry = cronEntry; } - + + /** + * @return if this JobLocation represents a cron entry. + */ public boolean isCron() { return getCronEntry() != null && getCronEntry().length() > 0; } - + /** * @return the delay */ @@ -184,15 +185,17 @@ class JobLocation { public Location getLocation() { return this.location; } - + + @Override public String toString() { return "Job [id=" + jobId + ", startTime=" + new Date(startTime) + ", delay=" + delay + ", period=" + period + ", repeat=" - + repeat + ", nextTime=" + new Date(nextTime) + "]"; + + repeat + ", nextTime=" + new Date(nextTime) + "]"; } static class JobLocationMarshaller extends VariableMarshaller<List<JobLocation>> { static final JobLocationMarshaller INSTANCE = new JobLocationMarshaller(); + @Override public List<JobLocation> readPayload(DataInput dataIn) throws IOException { List<JobLocation> result = new ArrayList<JobLocation>(); int size = dataIn.readInt(); @@ -204,6 +207,7 @@ class JobLocation { return result; } + @Override public void writePayload(List<JobLocation> value, DataOutput dataOut) throws IOException { dataOut.writeInt(value.size()); for (JobLocation jobLocation : value) { @@ -211,4 +215,78 @@ class JobLocation { } } } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((cronEntry == null) ? 0 : cronEntry.hashCode()); + result = prime * result + (int) (delay ^ (delay >>> 32)); + result = prime * result + ((jobId == null) ? 0 : jobId.hashCode()); + result = prime * result + ((location == null) ? 0 : location.hashCode()); + result = prime * result + (int) (nextTime ^ (nextTime >>> 32)); + result = prime * result + (int) (period ^ (period >>> 32)); + result = prime * result + repeat; + result = prime * result + (int) (startTime ^ (startTime >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + JobLocation other = (JobLocation) obj; + + if (cronEntry == null) { + if (other.cronEntry != null) { + return false; + } + } else if (!cronEntry.equals(other.cronEntry)) { + return false; + } + + if (delay != other.delay) { + return false; + } + + if (jobId == null) { + if (other.jobId != null) + return false; + } else if (!jobId.equals(other.jobId)) { + return false; + } + + if (location == null) { + if (other.location != null) { + return false; + } + } else if (!location.equals(other.location)) { + return false; + } + + if (nextTime != other.nextTime) { + return false; + } + if (period != other.period) { + return false; + } + if (repeat != other.repeat) { + return false; + } + if (startTime != other.startTime) { + return false; + } + + return true; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/d1446c3b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java index 062af74..415a292 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerImpl.java @@ -163,6 +163,15 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler }); } + synchronized void remove(final long time, final List<JobLocation> jobIds) throws IOException { + this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() { + @Override + public void execute(Transaction tx) throws IOException { + remove(tx, time, jobIds); + } + }); + } + /* * (non-Javadoc) * @@ -369,6 +378,34 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler return result; } + private void remove(Transaction tx, long time, List<JobLocation> jobIds) throws IOException { + List<JobLocation> result = removeFromIndex(tx, time, jobIds); + if (result != null) { + for (JobLocation jl : result) { + this.store.decrementJournalCount(tx, jl.getLocation()); + } + } + } + + private List<JobLocation> removeFromIndex(Transaction tx, long time, List<JobLocation> Jobs) throws IOException { + List<JobLocation> result = null; + List<JobLocation> values = this.index.remove(tx, time); + if (values != null) { + result = new ArrayList<JobLocation>(values.size()); + + for (JobLocation job : Jobs) { + if (values.remove(job)) { + result.add(job); + } + } + + if (!values.isEmpty()) { + this.index.put(tx, time, values); + } + } + return result; + } + void remove(Transaction tx, long time) throws IOException { List<JobLocation> values = this.index.remove(tx, time); if (values != null) { @@ -482,79 +519,84 @@ class JobSchedulerImpl extends ServiceSupport implements Runnable, JobScheduler // peek the next job long currentTime = System.currentTimeMillis(); - // Reads the list of the next entries and removes them from the store in one atomic step. - // Prevents race conditions on short delays, when storeJob() tries to append new items to the - // existing list during this read operation (see AMQ-3141). - synchronized (this) { - Map.Entry<Long, List<JobLocation>> first = getNextToSchedule(); - if (first != null) { - List<JobLocation> list = new ArrayList<JobLocation>(first.getValue()); - final long executionTime = first.getKey(); - long nextExecutionTime = 0; - if (executionTime <= currentTime) { - for (final JobLocation job : list) { - int repeat = job.getRepeat(); - nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat); - long waitTime = nextExecutionTime - currentTime; - this.scheduleTime.setWaitTime(waitTime); - if (job.isCron() == false) { + // Read the list of scheduled events and fire the jobs. Once done with the batch + // remove all that were fired, and reschedule as needed. + Map.Entry<Long, List<JobLocation>> first = getNextToSchedule(); + if (first != null) { + List<JobLocation> list = new ArrayList<JobLocation>(first.getValue()); + List<JobLocation> fired = new ArrayList<JobLocation>(list.size()); + final long executionTime = first.getKey(); + long nextExecutionTime = 0; + if (executionTime <= currentTime) { + for (final JobLocation job : list) { + int repeat = job.getRepeat(); + nextExecutionTime = calculateNextExecutionTime(job, currentTime, repeat); + long waitTime = nextExecutionTime - currentTime; + this.scheduleTime.setWaitTime(waitTime); + if (job.isCron() == false) { + fireJob(job); + if (repeat != 0) { + repeat--; + job.setRepeat(repeat); + // remove this job from the index so it doesn't get destroyed + removeFromIndex(executionTime, job.getJobId()); + // and re-store it + storeJob(job, nextExecutionTime); + } else { + fired.add(job); + } + } else { + // cron job will have a repeat time. + if (repeat == 0) { + // we haven't got a separate scheduler to execute at + // this time - just a cron job - so fire it fireJob(job); + } + + if (nextExecutionTime > currentTime) { + // we will run again ... + // remove this job from the index - so it doesn't get destroyed + removeFromIndex(executionTime, job.getJobId()); + // and re-store it + storeJob(job, nextExecutionTime); if (repeat != 0) { - repeat--; - job.setRepeat(repeat); - // remove this job from the index so it doesn't get destroyed - removeFromIndex(executionTime, job.getJobId()); - // and re-store it - storeJob(job, nextExecutionTime); + // we have a separate schedule to run at this time + // so the cron job is used to set of a separate schedule + // hence we won't fire the original cron job to the + // listeners but we do need to start a separate schedule + String jobId = ID_GENERATOR.generateId(); + ByteSequence payload = getPayload(job.getLocation()); + schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat()); + waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod(); + this.scheduleTime.setWaitTime(waitTime); } } else { - // cron job - if (repeat == 0) { - // we haven't got a separate scheduler to execute at - // this time - just a cron job - so fire it - fireJob(job); - } - if (nextExecutionTime > currentTime) { - // we will run again ... - // remove this job from the index - so it doesn't get destroyed - removeFromIndex(executionTime, job.getJobId()); - // and re-store it - storeJob(job, nextExecutionTime); - if (repeat != 0) { - // we have a separate schedule to run at this time - // so the cron job is used to set of a separate schedule - // hence we won't fire the original cron job to the - // listeners but we do need to start a separate schedule - String jobId = ID_GENERATOR.generateId(); - ByteSequence payload = getPayload(job.getLocation()); - schedule(jobId, payload, "", job.getDelay(), job.getPeriod(), job.getRepeat()); - waitTime = job.getDelay() != 0 ? job.getDelay() : job.getPeriod(); - this.scheduleTime.setWaitTime(waitTime); - } - } + fired.add(job); } } - // now remove all jobs that have not been - // rescheduled from this execution time - remove(executionTime); - - // If there is a job that should fire before the currently set wait time - // we need to reset wait time otherwise we'll miss it. - Map.Entry<Long, List<JobLocation>> nextUp = getNextToSchedule(); - if (nextUp != null) { - final long timeUntilNextScheduled = nextUp.getKey() - currentTime; - if (timeUntilNextScheduled < this.scheduleTime.getWaitTime()) { - this.scheduleTime.setWaitTime(timeUntilNextScheduled); - } - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Not yet time to execute the job, waiting " + (executionTime - currentTime) + " ms"); + } + + // now remove all jobs that have not been rescheduled from this execution + // time, if there are no more entries in that time it will be removed. + remove(executionTime, fired); + + // If there is a job that should fire before the currently set wait time + // we need to reset wait time otherwise we'll miss it. + Map.Entry<Long, List<JobLocation>> nextUp = getNextToSchedule(); + if (nextUp != null) { + final long timeUntilNextScheduled = nextUp.getKey() - currentTime; + if (timeUntilNextScheduled < this.scheduleTime.getWaitTime()) { + this.scheduleTime.setWaitTime(timeUntilNextScheduled); } - this.scheduleTime.setWaitTime(executionTime - currentTime); } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Not yet time to execute the job, waiting " + (executionTime - currentTime) + " ms"); + } + this.scheduleTime.setWaitTime(executionTime - currentTime); } } + this.scheduleTime.pause(); } catch (Exception ioe) { LOG.error(this.name + " Failed to schedule job", ioe); http://git-wip-us.apache.org/repos/asf/activemq/blob/d1446c3b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java new file mode 100644 index 0000000..04277bf --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.scheduler; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.Session; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.ProducerThread; + +public class JobSchedulerBrokerShutdownTest extends EmbeddedBrokerTestSupport { + + @Override + protected BrokerService createBroker() throws Exception { + File schedulerDirectory = new File("target/scheduler"); + + IOHelper.mkdirs(schedulerDirectory); + IOHelper.deleteChildren(schedulerDirectory); + + BrokerService broker = super.createBroker(); + broker.setSchedulerSupport(true); + broker.setSchedulerDirectoryFile(schedulerDirectory); + broker.getSystemUsage().getStoreUsage().setLimit(1 * 512); + broker.deleteAllMessages(); + return broker; + } + + @Override + protected boolean isPersistent() { + return true; + } + + public void testSchedule() throws Exception { + + Connection connection = createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + connection.start(); + final long time = 1000; + + ProducerThread producer = new ProducerThread(session, destination) { + @Override + protected Message createMessage(int i) throws Exception { + Message message = super.createMessage(i); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); + return message; + } + }; + + producer.setMessageCount(200); + producer.setDaemon(true); + + producer.start(); + + Thread.sleep(5000); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/d1446c3b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java index 0e6cc32..fd71558 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java @@ -134,7 +134,7 @@ public class AMQ3140Test { } // wait until all scheduled messages has been received - TimeUnit.SECONDS.sleep(10); + TimeUnit.MINUTES.sleep(2); session.close(); connection.close();
