Author: rajdavies
Date: Thu Aug 14 01:24:11 2008
New Revision: 685806
URL: http://svn.apache.org/viewvc?rev=685806&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1866
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=685806&r1=685805&r2=685806&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Thu Aug 14 01:24:11 2008
@@ -37,7 +37,7 @@
* The default number of messages to page in to the destination
* from persistent storage
*/
- public static final int DEFAULT_PAGE_SIZE=100;
+ public static final int DEFAULT_PAGE_SIZE=200;
protected final ActiveMQDestination destination;
protected final Broker broker;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=685806&r1=685805&r2=685806&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Aug 14 01:24:11 2008
@@ -378,9 +378,9 @@
}
}
if (callDispatchMatched && destination != null) {
- if (destination.isLazyDispatch()) {
+// if (destination.isLazyDispatch()) {
destination.wakeup();
- }
+// }
dispatchPending();
} else {
if (isSlave()) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=685806&r1=685805&r2=685806&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Aug 14 01:24:11 2008
@@ -81,6 +81,8 @@
protected final List<Subscription> consumers = new
ArrayList<Subscription>(50);
protected PendingMessageCursor messages;
private final LinkedHashMap<MessageId,QueueMessageReference>
pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>();
+ // Messages that are paged in but have not yet been targeted at a
subscription
+ private List<QueueMessageReference> pagedInPendingDispatch = new
ArrayList<QueueMessageReference>(100);
private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private MessageGroupMapFactory messageGroupMapFactory = new
MessageGroupHashBucketFactory();
@@ -317,6 +319,7 @@
}
public void send(final ProducerBrokerExchange producerExchange, final
Message message) throws Exception {
+// System.out.println(getName()+" send "+message.getMessageId());
final ConnectionContext context =
producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the
// destination.. it may have expired.
@@ -946,6 +949,18 @@
result = !messages.isEmpty();
}
+ // Kinda ugly.. but I think dispatchLock is the only mutex
protecting the
+ // pagedInPendingDispatch variable.
+ dispatchLock.lock();
+ try {
+ result |= !pagedInPendingDispatch.isEmpty();
+ } finally {
+ dispatchLock.unlock();
+ }
+
+ // Perhaps we should page always into the
pagedInPendingDispatch list is
+ // !messages.isEmpty(), and then if
!pagedInPendingDispatch.isEmpty()
+ // then we do a dispatch.
if (result) {
try {
pageInMessages(false);
@@ -1134,58 +1149,76 @@
}
private void doDispatch(List<QueueMessageReference> list) throws Exception
{
- if (list != null) {
- List<Subscription> consumers;
- dispatchLock.lock();
- try {
- synchronized (this.consumers) {
- consumers = new ArrayList<Subscription>(this.consumers);
+ dispatchLock.lock();
+ try {
+ if(!pagedInPendingDispatch.isEmpty()) {
+// System.out.println(getName()+": dispatching from pending:
"+pagedInPendingDispatch.size());
+ // Try to first dispatch anything that had not been dispatched
before.
+ pagedInPendingDispatch =
doActualDispatch(pagedInPendingDispatch);
+// System.out.println(getName()+": new pending list1:
"+pagedInPendingDispatch.size());
+ }
+ // and now see if we can dispatch the new stuff.. and append to
the pending
+ // list anything that does not actually get dispatched.
+ if (list != null && !list.isEmpty()) {
+// System.out.println(getName()+": dispatching from paged in:
"+list.size());
+ pagedInPendingDispatch.addAll(doActualDispatch(list));
+// System.out.println(getName()+": new pending list2:
"+pagedInPendingDispatch.size());
+ }
+ } finally {
+ dispatchLock.unlock();
+ }
+ }
+
+ /**
+ * @return list of messages that could get dispatched to consumers if they
were not full.
+ */
+ private List<QueueMessageReference>
doActualDispatch(List<QueueMessageReference> list) throws Exception {
+ List<QueueMessageReference> rc = new
ArrayList<QueueMessageReference>(list.size());
+ List<Subscription> consumers;
+
+ synchronized (this.consumers) {
+ consumers = new ArrayList<Subscription>(this.consumers);
+ }
+
+ for (MessageReference node : list) {
+ Subscription target = null;
+ int interestCount=0;
+ for (Subscription s : consumers) {
+ if (dispatchSelector.canSelect(s, node)) {
+ if (!s.isFull()) {
+ // Dispatch it.
+ s.add(node);
+// System.out.println(getName()+" Dispatched to
"+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
+ target = s;
+ break;
+ }
+ interestCount++;
}
+ }
-
- for (MessageReference node : list) {
- Subscription target = null;
- List<Subscription> targets = null;
- for (Subscription s : consumers) {
- if (dispatchSelector.canSelect(s, node)) {
- if (!s.isFull()) {
- s.add(node);
- target = s;
- break;
- } else {
- if (targets == null) {
- targets = new ArrayList<Subscription>();
- }
- targets.add(s);
- }
- }
- }
- if (target == null && targets != null) {
- // pick the least loaded to add the message too
- for (Subscription s : targets) {
- if (target == null
- || target.getPendingQueueSize() >
s.getPendingQueueSize()) {
- target = s;
- }
- }
- if (target != null) {
- target.add(node);
- }
- }
- if (target != null && !strictOrderDispatch &&
consumers.size() > 1 &&
- !dispatchSelector.isExclusiveConsumer(target)) {
- synchronized (this.consumers) {
- if( removeFromConsumerList(target) ) {
- addToConsumerList(target);
- consumers = new
ArrayList<Subscription>(this.consumers);
- }
- }
+ if (target == null && interestCount>0) {
+ // This means all subs were full...
+ rc.add((QueueMessageReference)node);
+ }
+
+ // If it got dispatched, rotate the consumer list to get round
robin distribution.
+ if (target != null && !strictOrderDispatch && consumers.size() > 1
&&
+ !dispatchSelector.isExclusiveConsumer(target)) {
+ synchronized (this.consumers) {
+ if( removeFromConsumerList(target) ) {
+ addToConsumerList(target);
+ consumers = new
ArrayList<Subscription>(this.consumers);
}
}
- } finally {
- dispatchLock.unlock();
}
}
+
+ //LOG.info(getName()+" Pending messages:");
+ //for (MessageReference n : rc) {
+ // LOG.info(getName()+" - " + n.getMessageId());
+ // }
+
+ return rc;
}
private void pageInMessages() throws Exception {