Author: rajdavies
Date: Fri Sep 4 14:25:18 2009
New Revision: 811425
URL: http://svn.apache.org/viewvc?rev=811425&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2356
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=811425&r1=811424&r2=811425&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Sep 4 14:25:18 2009
@@ -16,29 +16,6 @@
*/
package org.apache.activemq.broker.region;
-import java.io.IOException;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -77,6 +54,26 @@
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.AbstractList;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
/**
@@ -100,7 +97,7 @@
private final Object sendLock = new Object();
private ExecutorService executor;
protected final LinkedList<Runnable> messagesWaitingForSpace = new
LinkedList<Runnable>();
- private final ReentrantLock dispatchLock = new ReentrantLock();
+ private final Object dispatchMutex = new Object();
private boolean useConsumerPriority=true;
private boolean strictOrderDispatch=false;
private QueueDispatchSelector dispatchSelector;
@@ -276,8 +273,8 @@
// synchronize with dispatch method so that no new messages are sent
// while setting up a subscription. avoid out of order messages,
// duplicates, etc.
- dispatchLock.lock();
- try {
+ synchronized(dispatchMutex) {
+
sub.add(context, this);
destinationStatistics.getConsumers().increment();
@@ -324,8 +321,6 @@
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
}
- }finally {
- dispatchLock.unlock();
}
if (this.optimizedDispatch || isSlave()) {
// Outside of dispatchLock() to maintain the lock hierarchy of
@@ -339,8 +334,7 @@
destinationStatistics.getConsumers().decrement();
// synchronize with dispatch method so that no new messages are sent
// while removing up a subscription.
- dispatchLock.lock();
- try {
+ synchronized(dispatchMutex) {
if (LOG.isDebugEnabled()) {
LOG.debug("remove sub: " + sub + ", lastDeliveredSeqId: " +
lastDeiveredSequenceId
+ ", dequeues: " +
getDestinationStatistics().getDequeues().getCount()
@@ -390,8 +384,6 @@
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
}
- }finally {
- dispatchLock.unlock();
}
if (this.optimizedDispatch || isSlave()) {
// Outside of dispatchLock() to maintain the lock hierarchy of
@@ -750,8 +742,7 @@
try {
pageInMessages(forcePageIn);
List<MessageReference> toExpire = new
ArrayList<MessageReference>();
- dispatchLock.lock();
- try {
+ synchronized(dispatchMutex) {
synchronized (pagedInPendingDispatch) {
addAll(pagedInPendingDispatch, l, max, toExpire);
for (MessageReference ref : toExpire) {
@@ -796,9 +787,7 @@
}
}
}
- } finally {
- dispatchLock.unlock();
- }
+ }
} catch (Exception e) {
LOG.error("Problem retrieving message for browse", e);
}
@@ -1161,12 +1150,9 @@
// Kinda ugly.. but I think dispatchLock is the only mutex
protecting the
// pagedInPendingDispatch variable.
- dispatchLock.lock();
- try {
+ synchronized(dispatchMutex) {
pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
- } finally {
- dispatchLock.unlock();
- }
+ }
// Perhaps we should page always into the
pagedInPendingDispatch list is
// !messages.isEmpty(), and then if
!pagedInPendingDispatch.isEmpty()
@@ -1328,8 +1314,7 @@
private List<QueueMessageReference> doPageIn(boolean force) throws
Exception {
List<QueueMessageReference> result = null;
List<QueueMessageReference> resultList = null;
- dispatchLock.lock();
- try{
+ synchronized(dispatchMutex) {
int toPageIn = getMaxPageSize() + Math.max(0,
(int)destinationStatistics.getInflight().getCount()) - pagedInMessages.size();
if (LOG.isDebugEnabled()) {
@@ -1381,15 +1366,13 @@
// Avoid return null list, if condition is not validated
resultList = new ArrayList<QueueMessageReference>();
}
- }finally {
- dispatchLock.unlock();
}
return resultList;
}
private void doDispatch(List<QueueMessageReference> list) throws Exception
{
- dispatchLock.lock();
- try {
+ synchronized(dispatchMutex) {
+
synchronized (pagedInPendingDispatch) {
if (!pagedInPendingDispatch.isEmpty()) {
// Try to first dispatch anything that had not been
@@ -1412,9 +1395,7 @@
}
}
}
- } finally {
- dispatchLock.unlock();
- }
+ }
}
/**
@@ -1545,8 +1526,7 @@
QueueMessageReference message = null;
MessageId messageId = messageDispatchNotification.getMessageId();
- dispatchLock.lock();
- try {
+ synchronized(dispatchMutex) {
synchronized (pagedInPendingDispatch) {
for(QueueMessageReference ref : pagedInPendingDispatch) {
if (messageId.equals(ref.getMessageId())) {
@@ -1590,9 +1570,7 @@
}
}
- } finally {
- dispatchLock.unlock();
- }
+ }
if (message == null) {
throw new JMSException(
"Slave broker out of sync with master - Message: "
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java?rev=811425&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
Fri Sep 4 14:25:18 2009
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import
org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import
org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.perf.NumberOfDestinationsTest;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.io.File;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+/*
+A AMQ2356Test
+We have an environment where we have a very large number of destinations.
+In an effort to reduce the number of threads I have set the options
+-Dorg.apache.activemq.UseDedicatedTaskRunner=false
+
+and
+
+<policyEntry queue=">" optimizedDispatch="true"/>
+
+Unfortunately this very quickly leads to deadlocked queues.
+
+My environment is:
+
+ActiveMQ 5.2 Ubunty Jaunty kernel 2.6.28-14-generic #47-Ubuntu SMP (although
only a single core on my system)
+TCP transportConnector
+
+To reproduce the bug (which I can do 100% of the time) I connect 5 consumers
(AUTO_ACK) to 5 different queues.
+Then I start 5 producers and pair them up with a consumer on a queue, and they
start sending PERSISTENT messages.
+I've set the producer to send 100 messages and disconnect, and the consumer to
receive 100 messages and disconnect.
+The first pair usually gets through their 100 messages and disconnect, at
which point all the other pairs have
+deadlocked at less than 30 messages each.
+ */
+public class AMQ2356Test extends TestCase {
+ protected static final int MESSAGE_COUNT = 1000;
+ protected static final int NUMBER_OF_PAIRS = 10;
+ private static final Log LOG =
LogFactory.getLog(NumberOfDestinationsTest.class);
+ protected BrokerService broker;
+ protected String brokerURL =
ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+ protected int destinationCount;
+
+ public void testScenario() throws Exception {
+ for (int i = 0; i < NUMBER_OF_PAIRS; i++) {
+ ActiveMQQueue queue = new
ActiveMQQueue(getClass().getName()+":"+i);
+ ProducerConsumerPair cp = new ProducerConsumerPair();
+ cp.start(this.brokerURL, queue, MESSAGE_COUNT);
+ cp.testRun();
+ cp.stop();
+ }
+ }
+
+ protected Destination getDestination(Session session) throws JMSException {
+ String destinationName = getClass().getName() + "." +
destinationCount++;
+ return session.createQueue(destinationName);
+ }
+
+ protected void setUp() throws Exception {
+ if (broker == null) {
+ broker = createBroker();
+ }
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = new BrokerService();
+ configureBroker(answer);
+ answer.start();
+ return answer;
+ }
+
+ protected void configureBroker(BrokerService answer) throws Exception {
+ File dataFileDir = new
File("target/test-amq-data/bugs/AMQ2356/kahadb");
+ KahaDBStore kaha = new KahaDBStore();
+ kaha.setDirectory(dataFileDir);
+ answer.setUseJmx(false);
+ // Setup a destination policy where it takes only 1 message at a time.
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setOptimizedDispatch(true);
+ policyMap.setDefaultEntry(policy);
+ answer.setDestinationPolicy(policyMap);
+
+ answer.setAdvisorySupport(false);
+ answer.setEnableStatistics(false);
+ answer.setDeleteAllMessagesOnStartup(true);
+ answer.addConnector(brokerURL);
+
+ }
+ static class ProducerConsumerPair {
+ private Destination destination;
+ private MessageProducer producer;
+ private MessageConsumer consumer;
+ private Connection producerConnection;
+ private Connection consumerConnection;
+ private int numberOfMessages;
+
+ ProducerConsumerPair(){
+
+ }
+ void start(String brokerURL, final Destination dest, int msgNum)
throws Exception {
+ this.destination=dest;
+ this.numberOfMessages=msgNum;
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory(brokerURL);
+ this.producerConnection = cf.createConnection();
+ this.producerConnection.start();
+ this.consumerConnection = cf.createConnection();
+ this.consumerConnection.start();
+ this.producer=createProducer(this.producerConnection);
+ this.consumer=createConsumer(this.consumerConnection);
+ }
+
+ void testRun() throws Exception {
+
+
+ Session s = this.producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ for (int i = 0 ; i < this.numberOfMessages; i++) {
+ BytesMessage msg = s.createBytesMessage();
+ msg.writeBytes(new byte[1024]);
+ this.producer.send(msg);
+ }
+ int received = 0;
+ for (int i = 0 ; i < this.numberOfMessages; i++) {
+ Message msg = this.consumer.receive();
+ assertNotNull(msg);
+ received++;
+ }
+ assertEquals("Messages received on " +
this.destination,this.numberOfMessages,received);
+
+
+ }
+
+ void stop() throws Exception {
+ if (this.producerConnection != null) {
+ this.producerConnection.close();
+ }
+ if (this.consumerConnection != null) {
+ this.consumerConnection.close();
+ }
+ }
+
+ private MessageProducer createProducer(Connection connection) throws
Exception {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer result = session.createProducer(this.destination);
+ return result;
+ }
+
+ private MessageConsumer createConsumer(Connection connection) throws
Exception {
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer result = session.createConsumer(this.destination);
+ return result;
+ }
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2356Test.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java?rev=811425&r1=811424&r2=811425&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ExpiredMessagesWithNoConsumerTest.java
Fri Sep 4 14:25:18 2009
@@ -76,6 +76,7 @@
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
+ defaultEntry.setOptimizedDispatch(true);
defaultEntry.setExpireMessagesPeriod(100);
defaultEntry.setMaxExpirePageSize(800);