Author: chirino
Date: Tue Apr  1 06:22:48 2008
New Revision: 643390

URL: http://svn.apache.org/viewvc?rev=643390&view=rev
Log:
When pulling a message, iterate the destinations first to make sure that it has 
pushed all available messages to 
the sub.  This should fix the ZeroPrefetchTest that was intermitently failing 
on slower machines.


Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=643390&r1=643389&r2=643390&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
 Tue Apr  1 06:22:48 2008
@@ -27,12 +27,13 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.thread.Task;
 import org.apache.activemq.usage.MemoryUsage;
 
 /**
  * @version $Revision: 1.12 $
  */
-public interface Destination extends Service {
+public interface Destination extends Service, Task {
 
     void addSubscription(ConnectionContext context, Subscription sub) throws 
Exception;
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=643390&r1=643389&r2=643390&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
 Tue Apr  1 06:22:48 2008
@@ -206,4 +206,8 @@
     public void messageExpired(ConnectionContext context, PrefetchSubscription 
prefetchSubscription, MessageReference node) {
         next.messageExpired(context, prefetchSubscription, node);        
     }
+
+       public boolean iterate() {
+               return next.iterate();
+       }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=643390&r1=643389&r2=643390&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Tue Apr  1 06:22:48 2008
@@ -79,32 +79,43 @@
     /**
      * Allows a message to be pulled on demand by a client
      */
-    public synchronized Response pullMessage(ConnectionContext context, 
MessagePull pull) throws Exception {
+    public Response pullMessage(ConnectionContext context, MessagePull pull) 
throws Exception {
         // The slave should not deliver pull messages. TODO: when the slave
         // becomes a master,
         // He should send a NULL message to all the consumers to 'wake them up'
         // in case
         // they were waiting for a message.
         if (getPrefetchSize() == 0 && !isSlave()) {
-            prefetchExtension++;
-            final long dispatchCounterBeforePull = dispatchCounter;
-            dispatchPending();
-            // If there was nothing dispatched.. we may need to setup a 
timeout.
-            if (dispatchCounterBeforePull == dispatchCounter) {
-                // imediate timeout used by receiveNoWait()
-                if (pull.getTimeout() == -1) {
-                    // Send a NULL message.
-                    add(QueueMessageReference.NULL_MESSAGE);
-                    dispatchPending();
-                }
-                if (pull.getTimeout() > 0) {
-                    Scheduler.executeAfterDelay(new Runnable() {
-
-                        public void run() {
-                            pullTimeout(dispatchCounterBeforePull);
-                        }
-                    }, pull.getTimeout());
-                }
+            final long dispatchCounterBeforePull;
+               synchronized(this) {
+                       prefetchExtension++;
+                       dispatchCounterBeforePull = dispatchCounter;
+               }
+            
+               // Have the destination push us some messages.
+               for (Destination dest : destinations) {
+                               dest.iterate();
+                       }
+               dispatchPending();
+            
+            synchronized(this) {
+                   // If there was nothing dispatched.. we may need to setup a 
timeout.
+                   if (dispatchCounterBeforePull == dispatchCounter) {
+                       // imediate timeout used by receiveNoWait()
+                       if (pull.getTimeout() == -1) {
+                           // Send a NULL message.
+                           add(QueueMessageReference.NULL_MESSAGE);
+                           dispatchPending();
+                       }
+                       if (pull.getTimeout() > 0) {
+                           Scheduler.executeAfterDelay(new Runnable() {
+       
+                               public void run() {
+                                   pullTimeout(dispatchCounterBeforePull);
+                               }
+                           }, pull.getTimeout());
+                       }
+                   }
             }
         }
         return null;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=643390&r1=643389&r2=643390&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Tue Apr  1 06:22:48 2008
@@ -99,6 +99,7 @@
             wakeup();
         }
     };
+    private final Object iteratingMutex = new Object() {};
     
     private static final Comparator<Subscription>orderedCompare = new 
Comparator<Subscription>() {
 
@@ -914,51 +915,52 @@
      * @see org.apache.activemq.thread.Task#iterate()
      */
     public boolean iterate() {
-        
-        RecoveryDispatch rd;
-        while ((rd = getNextRecoveryDispatch()) != null) {
-            try {
-                MessageEvaluationContext msgContext = new 
NonCachedMessageEvaluationContext();
-                msgContext.setDestination(destination);
-    
-                for (QueueMessageReference node : rd.messages) {
-                    if (!node.isDropped() && !node.isAcked() && 
(!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
-                        msgContext.setMessageReference(node);
-                            if (rd.subscription.matches(node, msgContext)) {
-                                rd.subscription.add(node);
-                            }
-                    }
-                }
-                
-                if( rd.subscription instanceof QueueBrowserSubscription ) {
-                    
((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
-                }
-                
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-
-        boolean result = false;
-        synchronized (messages) {
-            result = !messages.isEmpty();
-        }               
-        
-        if (result) {
-            try {
-               pageInMessages(false);
-               
-            } catch (Throwable e) {
-                log.error("Failed to page in more queue messages ", e);
-            }
-        }
-        synchronized(messagesWaitingForSpace) {
-               while (!messagesWaitingForSpace.isEmpty() && 
!memoryUsage.isFull()) {
-                   Runnable op = messagesWaitingForSpace.removeFirst();
-                   op.run();
-               }
+        synchronized(iteratingMutex) {
+               RecoveryDispatch rd;
+               while ((rd = getNextRecoveryDispatch()) != null) {
+                   try {
+                       MessageEvaluationContext msgContext = new 
NonCachedMessageEvaluationContext();
+                       msgContext.setDestination(destination);
+           
+                       for (QueueMessageReference node : rd.messages) {
+                           if (!node.isDropped() && !node.isAcked() && 
(!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
+                               msgContext.setMessageReference(node);
+                                   if (rd.subscription.matches(node, 
msgContext)) {
+                                       rd.subscription.add(node);
+                                   }
+                           }
+                       }
+                       
+                       if( rd.subscription instanceof QueueBrowserSubscription 
) {
+                           
((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
+                       }
+                       
+                   } catch (Exception e) {
+                       e.printStackTrace();
+                   }
+               }
+       
+               boolean result = false;
+               synchronized (messages) {
+                   result = !messages.isEmpty();
+               }               
+               
+               if (result) {
+                   try {
+                      pageInMessages(false);
+                      
+                   } catch (Throwable e) {
+                       log.error("Failed to page in more queue messages ", e);
+                   }
+               }
+               synchronized(messagesWaitingForSpace) {
+                      while (!messagesWaitingForSpace.isEmpty() && 
!memoryUsage.isFull()) {
+                          Runnable op = messagesWaitingForSpace.removeFirst();
+                          op.run();
+                      }
+               }
+               return false;
         }
-        return false;
     }
 
     protected MessageReferenceFilter createMessageIdFilter(final String 
messageId) {


Reply via email to