Author: rajdavies
Date: Thu Aug 14 01:24:11 2008
New Revision: 685806

URL: http://svn.apache.org/viewvc?rev=685806&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1866

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=685806&r1=685805&r2=685806&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 Thu Aug 14 01:24:11 2008
@@ -37,7 +37,7 @@
      * The default number of messages to page in to the destination
      * from persistent storage
      */
-    public static final int DEFAULT_PAGE_SIZE=100;
+    public static final int DEFAULT_PAGE_SIZE=200;
    
     protected final ActiveMQDestination destination;
     protected final Broker broker;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=685806&r1=685805&r2=685806&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Thu Aug 14 01:24:11 2008
@@ -378,9 +378,9 @@
             }
         }
         if (callDispatchMatched && destination != null) {
-            if (destination.isLazyDispatch()) {
+//            if (destination.isLazyDispatch()) {
                 destination.wakeup();
-            }
+//            }
             dispatchPending();
         } else {
             if (isSlave()) {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=685806&r1=685805&r2=685806&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Thu Aug 14 01:24:11 2008
@@ -81,6 +81,8 @@
     protected final List<Subscription> consumers = new 
ArrayList<Subscription>(50);
     protected PendingMessageCursor messages;
     private final LinkedHashMap<MessageId,QueueMessageReference> 
pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>();
+    // Messages that are paged in but have not yet been targeted at a 
subscription
+    private List<QueueMessageReference> pagedInPendingDispatch = new 
ArrayList<QueueMessageReference>(100);
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
     private MessageGroupMapFactory messageGroupMapFactory = new 
MessageGroupHashBucketFactory();
@@ -317,6 +319,7 @@
     }
 
     public void send(final ProducerBrokerExchange producerExchange, final 
Message message) throws Exception {
+//        System.out.println(getName()+" send "+message.getMessageId());
         final ConnectionContext context = 
producerExchange.getConnectionContext();
         // There is delay between the client sending it and it arriving at the
         // destination.. it may have expired.
@@ -946,6 +949,18 @@
                    result = !messages.isEmpty();
                }               
                
+               // Kinda ugly.. but I think dispatchLock is the only mutex 
protecting the 
+               // pagedInPendingDispatch variable.             
+               dispatchLock.lock();
+               try {
+                   result |= !pagedInPendingDispatch.isEmpty();
+               } finally {
+                   dispatchLock.unlock();
+               }
+               
+               // Perhaps we should page always into the 
pagedInPendingDispatch list is 
+                // !messages.isEmpty(), and then if 
!pagedInPendingDispatch.isEmpty()
+                // then we do a dispatch.
                if (result) {
                    try {
                       pageInMessages(false);
@@ -1134,58 +1149,76 @@
     }
     
     private void doDispatch(List<QueueMessageReference> list) throws Exception 
{
-        if (list != null) {
-            List<Subscription> consumers;
-            dispatchLock.lock();
-            try {
-                synchronized (this.consumers) {
-                    consumers = new ArrayList<Subscription>(this.consumers);
+        dispatchLock.lock();
+        try {
+            if(!pagedInPendingDispatch.isEmpty()) {
+//                System.out.println(getName()+": dispatching from pending: 
"+pagedInPendingDispatch.size());
+                // Try to first dispatch anything that had not been dispatched 
before.
+                pagedInPendingDispatch = 
doActualDispatch(pagedInPendingDispatch);
+//                System.out.println(getName()+": new pending list1: 
"+pagedInPendingDispatch.size());
+            }
+            // and now see if we can dispatch the new stuff.. and append to 
the pending 
+            // list anything that does not actually get dispatched.
+            if (list != null && !list.isEmpty()) {
+//                System.out.println(getName()+": dispatching from paged in: 
"+list.size());
+                pagedInPendingDispatch.addAll(doActualDispatch(list));
+//                System.out.println(getName()+": new pending list2: 
"+pagedInPendingDispatch.size());
+            }
+        } finally {
+            dispatchLock.unlock();
+        }
+    }
+
+    /**
+     * @return list of messages that could get dispatched to consumers if they 
were not full.
+     */
+    private List<QueueMessageReference> 
doActualDispatch(List<QueueMessageReference> list) throws Exception {
+        List<QueueMessageReference> rc = new 
ArrayList<QueueMessageReference>(list.size());
+        List<Subscription> consumers;
+        
+        synchronized (this.consumers) {
+            consumers = new ArrayList<Subscription>(this.consumers);
+        }
+
+        for (MessageReference node : list) {
+            Subscription target = null;
+            int interestCount=0;
+            for (Subscription s : consumers) {
+                if (dispatchSelector.canSelect(s, node)) {
+                    if (!s.isFull()) {
+                        // Dispatch it.
+                        s.add(node);
+//                        System.out.println(getName()+" Dispatched to 
"+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
+                        target = s;
+                        break;
+                    } 
+                    interestCount++;
                 }
+            }
             
-            
-                for (MessageReference node : list) {
-                    Subscription target = null;
-                    List<Subscription> targets = null;
-                    for (Subscription s : consumers) {
-                        if (dispatchSelector.canSelect(s, node)) {
-                            if (!s.isFull()) {
-                                s.add(node);
-                                target = s;
-                                break;
-                            } else {
-                                if (targets == null) {
-                                    targets = new ArrayList<Subscription>();
-                                }
-                                targets.add(s);
-                            }
-                        }
-                    }
-                    if (target == null && targets != null) {
-                        // pick the least loaded to add the message too
-                        for (Subscription s : targets) {
-                            if (target == null
-                                    || target.getPendingQueueSize() > 
s.getPendingQueueSize()) {
-                                target = s;
-                            }
-                        }
-                        if (target != null) {
-                            target.add(node);
-                        }
-                    }
-                    if (target != null && !strictOrderDispatch && 
consumers.size() > 1 &&
-                            !dispatchSelector.isExclusiveConsumer(target)) {
-                        synchronized (this.consumers) {
-                            if( removeFromConsumerList(target) ) {
-                                addToConsumerList(target);
-                                consumers = new 
ArrayList<Subscription>(this.consumers);
-                            }
-                        }
+            if (target == null && interestCount>0) {
+                // This means all subs were full...
+                rc.add((QueueMessageReference)node);
+            }
+
+            // If it got dispatched, rotate the consumer list to get round 
robin distribution. 
+            if (target != null && !strictOrderDispatch && consumers.size() > 1 
&&
+                    !dispatchSelector.isExclusiveConsumer(target)) {
+                synchronized (this.consumers) {
+                    if( removeFromConsumerList(target) ) {
+                        addToConsumerList(target);
+                        consumers = new 
ArrayList<Subscription>(this.consumers);
                     }
                 }
-            } finally {
-                dispatchLock.unlock();
             }
         }
+
+        //LOG.info(getName()+" Pending messages:");
+        //for (MessageReference n : rc) {
+       //     LOG.info(getName()+"  - " + n.getMessageId());
+       // }
+        
+        return rc;
     }
 
     private void pageInMessages() throws Exception {


Reply via email to