Author: tabish
Date: Wed Feb 9 16:05:24 2011
New Revision: 1068952
URL: http://svn.apache.org/viewvc?rev=1068952&view=rev
Log:
Apply fix and test for: https://issues.apache.org/jira/browse/AMQ-3141
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java?rev=1068952&r1=1068951&r2=1068952&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/scheduler/JobSchedulerImpl.java
Wed Feb 9 16:05:24 2011
@@ -454,73 +454,77 @@ class JobSchedulerImpl extends ServiceSu
// peek the next job
long currentTime = System.currentTimeMillis();
- Map.Entry<Long, List<JobLocation>> first = getNextToSchedule();
- if (first != null) {
- List<JobLocation> list = new
ArrayList<JobLocation>(first.getValue());
- final long executionTime = first.getKey();
- long nextExecutionTime = 0;
- if (executionTime <= currentTime) {
-
- for (final JobLocation job : list) {
- int repeat = job.getRepeat();
- nextExecutionTime =
calculateNextExecutionTime(job, currentTime, repeat);
- long waitTime = nextExecutionTime - currentTime;
- this.scheduleTime.setWaitTime(waitTime);
- if (job.isCron() == false) {
- fireJob(job);
- if (repeat != 0) {
- repeat--;
- job.setRepeat(repeat);
- // remove this job from the index - so it
- // doesn't get destroyed
- removeFromIndex(executionTime,
job.getJobId());
- // and re-store it
- storeJob(job, nextExecutionTime);
- }
- } else {
- // cron job
- if (repeat == 0) {
- // we haven't got a separate scheduler to
- // execute at
- // this time - just a cron job - so fire it
+ // Reads the list of the next entries and removes them from
the store in one atomic step.
+ // Prevents race conditions on short delays, when storeJob()
tries to append new items to the
+ // existing list during this read operation (see AMQ-3141).
+ synchronized (this) {
+ Map.Entry<Long, List<JobLocation>> first =
getNextToSchedule();
+ if (first != null) {
+ List<JobLocation> list = new
ArrayList<JobLocation>(first.getValue());
+ final long executionTime = first.getKey();
+ long nextExecutionTime = 0;
+ if (executionTime <= currentTime) {
+
+ for (final JobLocation job : list) {
+ int repeat = job.getRepeat();
+ nextExecutionTime =
calculateNextExecutionTime(job, currentTime, repeat);
+ long waitTime = nextExecutionTime -
currentTime;
+ this.scheduleTime.setWaitTime(waitTime);
+ if (job.isCron() == false) {
fireJob(job);
- }
- if (nextExecutionTime > currentTime) {
- // we will run again ...
- // remove this job from the index - so it
- // doesn't get destroyed
- removeFromIndex(executionTime,
job.getJobId());
- // and re-store it
- storeJob(job, nextExecutionTime);
if (repeat != 0) {
- // we have a separate schedule to run
at
- // this time
- // so the cron job is used to set of a
- // seperate scheule
- // hence we won't fire the original
cron
- // job to the listeners
- // but we do need to start a separate
- // schedule
- String jobId =
ID_GENERATOR.generateId();
- ByteSequence payload =
getPayload(job.getLocation());
- schedule(jobId, payload, "",
job.getDelay(), job.getPeriod(), job.getRepeat());
- waitTime = job.getDelay() != 0 ?
job.getDelay() : job.getPeriod();
-
this.scheduleTime.setWaitTime(waitTime);
+ repeat--;
+ job.setRepeat(repeat);
+ // remove this job from the index - so
it
+ // doesn't get destroyed
+ removeFromIndex(executionTime,
job.getJobId());
+ // and re-store it
+ storeJob(job, nextExecutionTime);
+ }
+ } else {
+ // cron job
+ if (repeat == 0) {
+ // we haven't got a separate scheduler
to
+ // execute at
+ // this time - just a cron job - so
fire it
+ fireJob(job);
+ }
+ if (nextExecutionTime > currentTime) {
+ // we will run again ...
+ // remove this job from the index - so
it
+ // doesn't get destroyed
+ removeFromIndex(executionTime,
job.getJobId());
+ // and re-store it
+ storeJob(job, nextExecutionTime);
+ if (repeat != 0) {
+ // we have a separate schedule to
run at
+ // this time
+ // so the cron job is used to set
of a
+ // seperate scheule
+ // hence we won't fire the
original cron
+ // job to the listeners
+ // but we do need to start a
separate
+ // schedule
+ String jobId =
ID_GENERATOR.generateId();
+ ByteSequence payload =
getPayload(job.getLocation());
+ schedule(jobId, payload, "",
job.getDelay(), job.getPeriod(), job.getRepeat());
+ waitTime = job.getDelay() != 0 ?
job.getDelay() : job.getPeriod();
+
this.scheduleTime.setWaitTime(waitTime);
+ }
}
}
}
+ // now remove all jobs that have not been
+ // rescheduled from this execution time
+ remove(executionTime);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Not yet time to execute the job,
waiting " + (executionTime - currentTime) + " ms");
+ }
+ this.scheduleTime.setWaitTime(executionTime -
currentTime);
}
- // now remove all jobs that have not been
- // rescheduled from this execution time
- remove(executionTime);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Not yet time to execute the job,
waiting " + (executionTime - currentTime) + " ms");
- }
- this.scheduleTime.setWaitTime(executionTime -
currentTime);
}
}
-
this.scheduleTime.pause();
} catch (Exception ioe) {
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java?rev=1068952&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
Wed Feb 9 16:05:24 2011
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ3141Test {
+
+ private static final int MAX_MESSAGES = 100;
+
+ private static final long DELAY_IN_MS = 100;
+
+ private static final String QUEUE_NAME = "target.queue";
+
+ private BrokerService broker;
+
+ private final CountDownLatch messageCountDown = new
CountDownLatch(MAX_MESSAGES);
+
+ private ConnectionFactory factory;
+
+ @Before
+ public void setup() throws Exception {
+
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setSchedulerSupport(true);
+ broker.setDataDirectory("target");
+ broker.setUseJmx(false);
+ broker.addConnector("vm://localhost");
+
+ File schedulerDirectory = new File("target/test/ScheduledDB");
+ IOHelper.mkdirs(schedulerDirectory);
+ IOHelper.deleteChildren(schedulerDirectory);
+ broker.setSchedulerDirectoryFile(schedulerDirectory);
+
+ broker.start();
+ broker.waitUntilStarted();
+
+ factory = new ActiveMQConnectionFactory("vm://localhost");
+ }
+
+ private void sendMessages() throws Exception {
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(QUEUE_NAME));
+ for (int i = 0; i < MAX_MESSAGES; i++) {
+ Message message = session.createTextMessage();
+ message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,
DELAY_IN_MS);
+ producer.send(message);
+ }
+ connection.close();
+ }
+
+ @Test
+ public void testNoMissingMessagesOnShortScheduleDelay() throws Exception {
+
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(QUEUE_NAME));
+
+ consumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ messageCountDown.countDown();
+ }
+ });
+ sendMessages();
+
+ boolean receiveComplete = messageCountDown.await(5, TimeUnit.SECONDS);
+
+ connection.close();
+
+ assertTrue("expect all messages received but " +
messageCountDown.getCount() + " are missing", receiveComplete);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ broker.stop();
+ }
+
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java
------------------------------------------------------------------------------
svn:eol-style = native