Author: tabish
Date: Thu Dec 22 15:15:48 2011
New Revision: 1222275
URL: http://svn.apache.org/viewvc?rev=1222275&view=rev
Log:
apply patch and add test for: https://issues.apache.org/jira/browse/AMQ-3436
Some modifications needed for the patch to work correctly.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1222275&r1=1222274&r2=1222275&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Dec 22 15:15:48 2011
@@ -40,13 +40,18 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
+
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.region.cursors.OrderedPendingList;
+import org.apache.activemq.broker.region.cursors.PendingList;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.PrioritizedPendingList;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
@@ -92,8 +97,8 @@ public class Queue extends BaseDestinati
// Messages that are paged in but have not yet been targeted at a
// subscription
private final ReentrantReadWriteLock pagedInPendingDispatchLock = new
ReentrantReadWriteLock();
- private List<QueueMessageReference> pagedInPendingDispatch = new
ArrayList<QueueMessageReference>(100);
- private List<QueueMessageReference> redeliveredWaitingDispatch = new
ArrayList<QueueMessageReference>();
+ protected PendingList pagedInPendingDispatch = new OrderedPendingList();
+ protected PendingList redeliveredWaitingDispatch = new
OrderedPendingList();
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new
MessageGroupHashBucketFactory();
@@ -123,9 +128,7 @@ public class Queue extends BaseDestinati
}
};
- private final Object iteratingMutex = new Object() {
- };
-
+ private final Object iteratingMutex = new Object();
class TimeoutMessage implements Delayed {
@@ -305,7 +308,21 @@ public class Queue extends BaseDestinati
}
@Override
+ public void setPrioritizedMessages(boolean prioritizedMessages) {
+ super.setPrioritizedMessages(prioritizedMessages);
+
+ if (prioritizedMessages && this.pagedInPendingDispatch instanceof
OrderedPendingList) {
+ pagedInPendingDispatch = new PrioritizedPendingList();
+ redeliveredWaitingDispatch = new PrioritizedPendingList();
+ } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
+ pagedInPendingDispatch = new OrderedPendingList();
+ redeliveredWaitingDispatch = new OrderedPendingList();
+ }
+ }
+
+ @Override
public void initialize() throws Exception {
+
if (this.messages == null) {
if (destination.isTemporary() || broker == null || store == null) {
this.messages = new
VMPendingMessageCursor(isPrioritizedMessages());
@@ -313,6 +330,7 @@ public class Queue extends BaseDestinati
this.messages = new StoreQueueCursor(broker, this);
}
}
+
// If a VMPendingMessageCursor don't use the default Producer System
// Usage
// since it turns into a shared blocking queue which can lead to a
@@ -529,10 +547,10 @@ public class Queue extends BaseDestinati
}
}
}
- redeliveredWaitingDispatch.add(qmr);
+ redeliveredWaitingDispatch.addMessageLast(qmr);
}
if (!redeliveredWaitingDispatch.isEmpty()) {
- doDispatch(new ArrayList<QueueMessageReference>());
+ doDispatch(new OrderedPendingList());
}
}finally {
consumersLock.writeLock().unlock();
@@ -994,7 +1012,7 @@ public class Queue extends BaseDestinati
pagedInPendingDispatchLock.writeLock().lock();
try {
- addAll(pagedInPendingDispatch, browseList, max, toExpire);
+ addAll(pagedInPendingDispatch.values(), browseList, max,
toExpire);
for (MessageReference ref : toExpire) {
pagedInPendingDispatch.remove(ref);
if (broker.isExpired(ref)) {
@@ -1066,10 +1084,10 @@ public class Queue extends BaseDestinati
}
}
- private void addAll(Collection<QueueMessageReference> refs, List<Message>
l, int maxBrowsePageSize,
+ private void addAll(Collection<? extends MessageReference> refs,
List<Message> l, int maxBrowsePageSize,
List<MessageReference> toExpire) throws Exception {
- for (Iterator<QueueMessageReference> i = refs.iterator(); i.hasNext()
&& l.size() < getMaxBrowsePageSize();) {
- QueueMessageReference ref = i.next();
+ for (Iterator<? extends MessageReference> i = refs.iterator();
i.hasNext() && l.size() < getMaxBrowsePageSize();) {
+ QueueMessageReference ref = (QueueMessageReference) i.next();
if (ref.isExpired()) {
toExpire.add(ref);
} else if (l.contains(ref.getMessage()) == false) {
@@ -1675,15 +1693,16 @@ public class Queue extends BaseDestinati
}
private void doPageIn(boolean force) throws Exception {
- List<QueueMessageReference> newlyPaged = doPageInForDispatch(force);
+ PendingList newlyPaged = doPageInForDispatch(force);
pagedInPendingDispatchLock.writeLock().lock();
try {
if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(newlyPaged);
+
} else {
- for (QueueMessageReference qmr : newlyPaged) {
+ for (MessageReference qmr : newlyPaged) {
if (!pagedInPendingDispatch.contains(qmr)) {
- pagedInPendingDispatch.add(qmr);
+ pagedInPendingDispatch.addMessageLast(qmr);
}
}
}
@@ -1692,9 +1711,9 @@ public class Queue extends BaseDestinati
}
}
- private List<QueueMessageReference> doPageInForDispatch(boolean force)
throws Exception {
+ private PendingList doPageInForDispatch(boolean force) throws Exception {
List<QueueMessageReference> result = null;
- List<QueueMessageReference> resultList = null;
+ PendingList resultList = null;
int toPageIn = Math.min(getMaxPageSize(), messages.size());
if (LOG.isDebugEnabled()) {
@@ -1750,11 +1769,15 @@ public class Queue extends BaseDestinati
// dispatch attempts
pagedInMessagesLock.writeLock().lock();
try {
- resultList = new
ArrayList<QueueMessageReference>(result.size());
+ if(isPrioritizedMessages()) {
+ resultList = new PrioritizedPendingList();
+ } else {
+ resultList = new OrderedPendingList();
+ }
for (QueueMessageReference ref : result) {
if (!pagedInMessages.containsKey(ref.getMessageId())) {
pagedInMessages.put(ref.getMessageId(), ref);
- resultList.add(ref);
+ resultList.addMessageLast(ref);
} else {
ref.decrementReferenceCount();
}
@@ -1764,13 +1787,13 @@ public class Queue extends BaseDestinati
}
} else {
// Avoid return null list, if condition is not validated
- resultList = new ArrayList<QueueMessageReference>();
+ resultList = new OrderedPendingList();
}
return resultList;
}
- private void doDispatch(List<QueueMessageReference> list) throws Exception
{
+ private void doDispatch(PendingList list) throws Exception {
boolean doWakeUp = false;
pagedInPendingDispatchLock.writeLock().lock();
@@ -1792,9 +1815,9 @@ public class Queue extends BaseDestinati
if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(doActualDispatch(list));
} else {
- for (QueueMessageReference qmr : list) {
+ for (MessageReference qmr : list) {
if (!pagedInPendingDispatch.contains(qmr)) {
- pagedInPendingDispatch.add(qmr);
+ pagedInPendingDispatch.addMessageLast(qmr);
}
}
doWakeUp = true;
@@ -1814,9 +1837,10 @@ public class Queue extends BaseDestinati
* @return list of messages that could get dispatched to consumers if they
* were not full.
*/
- private List<QueueMessageReference>
doActualDispatch(List<QueueMessageReference> list) throws Exception {
+ private PendingList doActualDispatch(PendingList list) throws Exception {
List<Subscription> consumers;
consumersLock.writeLock().lock();
+
try {
if (this.consumers.isEmpty() || isSlave()) {
// slave dispatch happens in processDispatchNotification
@@ -1827,10 +1851,18 @@ public class Queue extends BaseDestinati
consumersLock.writeLock().unlock();
}
- List<QueueMessageReference> rc = new
ArrayList<QueueMessageReference>(list.size());
+ PendingList rc;
+ if(isPrioritizedMessages()) {
+ rc = new PrioritizedPendingList();
+ } else {
+ rc = new OrderedPendingList();
+ }
+
Set<Subscription> fullConsumers = new
HashSet<Subscription>(this.consumers.size());
- for (MessageReference node : list) {
+ for (Iterator<MessageReference> iterator = list.iterator();
iterator.hasNext();) {
+
+ MessageReference node = (MessageReference) iterator.next();
Subscription target = null;
int interestCount = 0;
for (Subscription s : consumers) {
@@ -1863,7 +1895,7 @@ public class Queue extends BaseDestinati
if ((target == null && interestCount > 0) || consumers.size() ==
0) {
// This means all subs were full or that there are no
// consumers...
- rc.add((QueueMessageReference) node);
+ rc.addMessageLast((QueueMessageReference) node);
}
// If it got dispatched, rotate the consumer list to get round
robin
@@ -1886,7 +1918,6 @@ public class Queue extends BaseDestinati
}
protected boolean assignMessageGroup(Subscription subscription,
QueueMessageReference node) throws Exception {
- //QueueMessageReference node = (QueueMessageReference) m;
boolean result = true;
// Keep message groups together.
String groupId = node.getGroupID();
@@ -2002,9 +2033,9 @@ public class Queue extends BaseDestinati
pagedInPendingDispatchLock.writeLock().lock();
try {
- for (QueueMessageReference ref : pagedInPendingDispatch) {
+ for (MessageReference ref : pagedInPendingDispatch) {
if (messageId.equals(ref.getMessageId())) {
- message = ref;
+ message = (QueueMessageReference)ref;
pagedInPendingDispatch.remove(ref);
break;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java?rev=1222275&r1=1222274&r2=1222275&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/OrderedPendingList.java
Thu Dec 22 15:15:48 2011
@@ -17,6 +17,7 @@
package org.apache.activemq.broker.region.cursors;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -25,9 +26,10 @@ import org.apache.activemq.broker.region
import org.apache.activemq.command.MessageId;
public class OrderedPendingList implements PendingList {
- PendingNode root = null;
- PendingNode tail = null;
- final Map<MessageId, PendingNode> map = new HashMap<MessageId,
PendingNode>();
+
+ private PendingNode root = null;
+ private PendingNode tail = null;
+ private final Map<MessageId, PendingNode> map = new HashMap<MessageId,
PendingNode>();
public PendingNode addMessageFirst(MessageReference message) {
PendingNode node = new PendingNode(this, message);
@@ -130,4 +132,28 @@ public class OrderedPendingList implemen
return "OrderedPendingList(" + System.identityHashCode(this) + ")";
}
+ @Override
+ public boolean contains(MessageReference message) {
+ if(map.values().contains(message)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public Collection<MessageReference> values() {
+ List<MessageReference> messageReferences = new
ArrayList<MessageReference>();
+ for(PendingNode pendingNode : map.values()) {
+ messageReferences.add(pendingNode.getMessage());
+ }
+ return messageReferences;
+ }
+
+ @Override
+ public void addAll(PendingList pendingList) {
+ for(MessageReference messageReference : pendingList) {
+ addMessageLast(messageReference);
+ }
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java?rev=1222275&r1=1222274&r2=1222275&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingList.java
Thu Dec 22 15:15:48 2011
@@ -16,16 +16,96 @@
*/
package org.apache.activemq.broker.region.cursors;
+import java.util.Collection;
import java.util.Iterator;
+
import org.apache.activemq.broker.region.MessageReference;
-public interface PendingList {
-
+public interface PendingList extends Iterable<MessageReference> {
+
+ /**
+ * Returns true if there are no Messages in the PendingList currently.
+ * @return true if the PendingList is currently empty.
+ */
public boolean isEmpty();
+
+ /**
+ * Discards all Messages currently held in the PendingList.
+ */
public void clear();
+
+ /**
+ * Adds the given message to the head of the list.
+ *
+ * @param message
+ * The MessageReference that is to be added to this list.
+ *
+ * @return the PendingNode that contains the newly added message.
+ */
public PendingNode addMessageFirst(MessageReference message);
+
+ /**
+ * Adds the given message to the tail of the list.
+ *
+ * @param message
+ * The MessageReference that is to be added to this list.
+ *
+ * @return the PendingNode that contains the newly added message.
+ */
public PendingNode addMessageLast(MessageReference message);
+
+ /**
+ * Removes the given MessageReference from the PendingList if it is
+ * contained within.
+ *
+ * @param message
+ * The MessageReference that is to be removed to this list.
+ *
+ * @return the PendingNode that contains the removed message or null if the
+ * message was not present in this list.
+ */
public PendingNode remove(MessageReference message);
+
+ /**
+ * Returns the number of MessageReferences that are awaiting dispatch.
+ * @return current count of the pending messages.
+ */
public int size();
+
+ /**
+ * Returns an iterator over the pending Messages. The subclass controls
how
+ * the returned iterator actually traverses the list of pending messages
allowing
+ * for the order to vary based on factors like Message priority or some
other
+ * mechanism.
+ *
+ * @return an Iterator that returns MessageReferences contained in this
list.
+ */
public Iterator<MessageReference> iterator();
+
+ /**
+ * Query the PendingList to determine if the given message is contained
within.
+ *
+ * @param message
+ * The Message that is the target of this query.
+ *
+ * @return true if the MessageReference is contained in this list.
+ */
+ public boolean contains(MessageReference message);
+
+ /**
+ * Returns a new Collection that contains all the MessageReferences
currently
+ * held in this PendingList. The elements of the list are ordered using
the
+ * same rules as the subclass uses for iteration.
+ *
+ * @return a new Collection containing this lists MessageReferences.
+ */
+ public Collection<MessageReference> values();
+
+ /**
+ * Adds all the elements of the given PendingList to this PendingList.
+ *
+ * @param pendingList
+ * The PendingList that is to be added to this collection.
+ */
+ public void addAll(PendingList pendingList);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java?rev=1222275&r1=1222274&r2=1222275&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
Thu Dec 22 15:15:48 2011
@@ -17,23 +17,27 @@
package org.apache.activemq.broker.region.cursors;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
public class PrioritizedPendingList implements PendingList {
- static final Integer MAX_PRIORITY = 10;
+
+ private static final Integer MAX_PRIORITY = 10;
private final OrderedPendingList[] lists = new
OrderedPendingList[MAX_PRIORITY];
- final Map<MessageId, PendingNode> map = new HashMap<MessageId,
PendingNode>();
+ private final Map<MessageId, PendingNode> map = new HashMap<MessageId,
PendingNode>();
public PrioritizedPendingList() {
for (int i = 0; i < MAX_PRIORITY; i++) {
this.lists[i] = new OrderedPendingList();
}
}
+
public PendingNode addMessageFirst(MessageReference message) {
PendingNode node = getList(message).addMessageFirst(message);
this.map.put(message.getMessageId(), node);
@@ -124,9 +128,32 @@ public class PrioritizedPendingList impl
map.remove(node.getMessage().getMessageId());
node.getList().removeNode(node);
}
+ }
+ }
+
+ @Override
+ public boolean contains(MessageReference message) {
+ if (map.values().contains(message)) {
+ return true;
+ }
+
+ return false;
+ }
+ @Override
+ public Collection<MessageReference> values() {
+ List<MessageReference> messageReferences = new
ArrayList<MessageReference>();
+ for (PendingNode pendingNode : map.values()) {
+ messageReferences.add(pendingNode.getMessage());
}
+ return messageReferences;
+ }
+ @Override
+ public void addAll(PendingList pendingList) {
+ for(MessageReference messageReference : pendingList) {
+ addMessageLast(messageReference);
+ }
}
}
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java?rev=1222275&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
Thu Dec 22 15:15:48 2011
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ3436Test {
+
+ protected static final Logger LOG =
LoggerFactory.getLogger(AMQ3436Test.class);
+
+ private BrokerService broker;
+ private PersistenceAdapter adapter;
+ private boolean useCache = true;
+ private boolean prioritizeMessages = true;
+
+ protected PersistenceAdapter createPersistenceAdapter(boolean delete)
throws Exception {
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setConcurrentStoreAndDispatchQueues(false);
+ adapter.setConcurrentStoreAndDispatchTopics(false);
+ adapter.deleteAllMessages();
+ return adapter;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.setBrokerName("priorityTest");
+ broker.setAdvisorySupport(false);
+ broker.setUseJmx(false);
+ adapter = createPersistenceAdapter(true);
+ broker.setPersistenceAdapter(adapter);
+ PolicyEntry policy = new PolicyEntry();
+ policy.setPrioritizedMessages(prioritizeMessages);
+ policy.setUseCache(useCache);
+ policy.setProducerFlowControl(false);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.put(new ActiveMQQueue("TEST"), policy);
+
+ // do not process expired for one test
+ PolicyEntry ignoreExpired = new PolicyEntry();
+ SharedDeadLetterStrategy ignoreExpiredStrategy = new
SharedDeadLetterStrategy();
+ ignoreExpiredStrategy.setProcessExpired(false);
+ ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy);
+
+ broker.setDestinationPolicy(policyMap);
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+ protected void tearDown() throws Exception {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+
+ @Test
+ public void testPriorityWhenConsumerCreatedBeforeProduction() throws
Exception {
+
+ int messageCount = 200;
+ URI failoverUri = new
URI("vm://priorityTest?jms.prefetchPolicy.all=1");
+
+ ActiveMQQueue dest = new
ActiveMQQueue("TEST?consumer.dispatchAsync=false");
+
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory(failoverUri);
+ cf.setDispatchAsync(false);
+
+ // Create producer
+ ActiveMQConnection producerConnection = (ActiveMQConnection)
cf.createConnection();
+ producerConnection.setMessagePrioritySupported(true);
+ producerConnection.start();
+ final Session producerSession = producerConnection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer = producerSession.createProducer(dest);
+
+ ActiveMQMessageConsumer consumer;
+
+ // Create consumer on separate connection
+ ActiveMQConnection consumerConnection = (ActiveMQConnection)
cf.createConnection();
+ consumerConnection.setMessagePrioritySupported(true);
+ consumerConnection.start();
+ final ActiveMQSession consumerSession = (ActiveMQSession)
consumerConnection.createSession(true,
+ Session.SESSION_TRANSACTED);
+ consumer = (ActiveMQMessageConsumer)
consumerSession.createConsumer(dest);
+
+ // Produce X number of messages with a session commit after each
message
+ Random random = new Random();
+ for (int i = 0; i < messageCount; ++i) {
+
+ Message message = producerSession.createTextMessage("Test message
#" + i);
+ producer.send(message, DeliveryMode.PERSISTENT,
random.nextInt(10), 45*1000);
+ producerSession.commit();
+ }
+ producer.close();
+
+ // ***************************************************
+ // If we create the consumer here instead of above, the
+ // the messages will be consumed in priority order
+ // ***************************************************
+ //consumer = (ActiveMQMessageConsumer)
consumerSession.createConsumer(dest);
+
+ // Consume all of the messages we produce using a listener.
+ // Don't exit until we get all the messages.
+ final CountDownLatch latch = new CountDownLatch(messageCount);
+ final StringBuffer failureMessage = new StringBuffer();
+ consumer.setMessageListener(new MessageListener() {
+ int lowestPrioritySeen = 10;
+
+ boolean firstMessage = true;
+
+ public void onMessage(Message msg) {
+ try {
+
+ int currentPriority = msg.getJMSPriority();
+ LOG.debug(currentPriority + "<=" + lowestPrioritySeen);
+
+ // Ignore the first message priority since it is prefetched
+ // and is out of order by design
+ if (firstMessage == true) {
+ firstMessage = false;
+ LOG.debug("Ignoring first message since it was
prefetched");
+
+ } else {
+
+ // Verify that we never see a priority higher than the
+ // lowest
+ // priority seen
+ if (lowestPrioritySeen > currentPriority) {
+ lowestPrioritySeen = currentPriority;
+ }
+ if (lowestPrioritySeen < currentPriority) {
+ failureMessage.append("Incorrect priority seen
(Lowest Priority = " + lowestPrioritySeen
+ + " Current Priority = " + currentPriority
+ ")"
+ + System.getProperty("line.separator"));
+ }
+ }
+
+ } catch (JMSException e) {
+ e.printStackTrace();
+ } finally {
+ latch.countDown();
+ LOG.debug("Messages remaining = " + latch.getCount());
+ }
+ }
+ });
+
+ latch.await();
+ consumer.close();
+
+ // Cleanup producer resources
+ producerSession.close();
+ producerConnection.stop();
+ producerConnection.close();
+
+ // Cleanup consumer resources
+ consumerSession.close();
+ consumerConnection.stop();
+ consumerConnection.close();
+
+ // Report the failure if found
+ if (failureMessage.length() > 0) {
+ Assert.fail(failureMessage.toString());
+ }
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java
------------------------------------------------------------------------------
svn:eol-style = native