Author: rajdavies
Date: Wed Sep 24 06:51:18 2008
New Revision: 698573
URL: http://svn.apache.org/viewvc?rev=698573&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1947
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=698573&r1=698572&r2=698573&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Sep 24 06:51:18 2008
@@ -18,13 +18,11 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@@ -86,7 +84,7 @@
protected PendingMessageCursor messages;
private final LinkedHashMap<MessageId,QueueMessageReference>
pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>();
// Messages that are paged in but have not yet been targeted at a
subscription
- private LinkedHashSet<QueueMessageReference> pagedInPendingDispatch = new
LinkedHashSet<QueueMessageReference>(100);
+ private List<QueueMessageReference> pagedInPendingDispatch = new
ArrayList<QueueMessageReference>(100);
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new
MessageGroupHashBucketFactory();
@@ -968,7 +966,9 @@
dispatchLock.lock();
try {
synchronized(pagedInPendingDispatch) {
- pagedInPendingDispatch.add(node);
+ if
(!pagedInPendingDispatch.contains(node)) {
+
pagedInPendingDispatch.add(node);
+ }
}
} finally {
dispatchLock.unlock();
@@ -1219,7 +1219,15 @@
// the pending
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
- pagedInPendingDispatch.addAll(doActualDispatch(list));
+ if (pagedInPendingDispatch.isEmpty()) {
+ pagedInPendingDispatch.addAll(doActualDispatch(list));
+ } else {
+ for (QueueMessageReference qmr : list) {
+ if (!pagedInPendingDispatch.contains(qmr)) {
+ pagedInPendingDispatch.add(qmr);
+ }
+ }
+ }
}
}
} finally {
@@ -1231,8 +1239,8 @@
* @return list of messages that could get dispatched to consumers if they
* were not full.
*/
- private LinkedHashSet<QueueMessageReference>
doActualDispatch(Collection<QueueMessageReference> collection) throws Exception
{
- LinkedHashSet<QueueMessageReference> rc = new
LinkedHashSet<QueueMessageReference>(collection.size());
+ private List<QueueMessageReference>
doActualDispatch(List<QueueMessageReference> list) throws Exception {
+ List<QueueMessageReference> rc = new
ArrayList<QueueMessageReference>(list.size());
Set<Subscription> fullConsumers = new
HashSet<Subscription>(this.consumers.size());
List<Subscription> consumers;
@@ -1240,7 +1248,7 @@
consumers = new ArrayList<Subscription>(this.consumers);
}
- for (MessageReference node : collection) {
+ for (MessageReference node : list) {
Subscription target = null;
int interestCount=0;
for (Subscription s : consumers) {