Author: chirino
Date: Fri Apr 28 12:13:54 2006
New Revision: 397985

URL: http://svn.apache.org/viewcvs?rev=397985&view=rev
Log:
Gaurd access to the pending list better.

Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=397985&r1=397984&r2=397985&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
 Fri Apr 28 12:13:54 2006
@@ -63,9 +63,7 @@
             Topic topic = (Topic) destination;            
             topic.activate(context, this);
         }
-        if( !isFull() ) {
-            dispatchMatched();
-        }
+        dispatchMatched();
     }
    
     synchronized public void activate(ConnectionContext context, ConsumerInfo 
info) throws Exception {
@@ -79,9 +77,7 @@
                     topic.activate(context, this);
                 }
             }
-            if( !isFull() ) {
-                dispatchMatched();
-            }
+            dispatchMatched();
         }
     }
 
@@ -104,7 +100,9 @@
                 redeliveredMessages.put(node.getMessageId(), new Integer(1));
             }
             if( keepDurableSubsActive ) {
-                pending.addFirst(node);
+               synchronized(pending) {
+                       pending.addFirst(node);
+               }
             } else {
                 node.decrementReferenceCount();
             }
@@ -112,11 +110,13 @@
         }
         
         if( !keepDurableSubsActive ) {
-            for (Iterator iter = pending.iterator(); iter.hasNext();) {
-                MessageReference node = (MessageReference) iter.next();
-                node.decrementReferenceCount();
-                iter.remove();
-            }
+               synchronized(pending) {
+                   for (Iterator iter = pending.iterator(); iter.hasNext();) {
+                       MessageReference node = (MessageReference) iter.next();
+                       node.decrementReferenceCount();
+                       iter.remove();
+                   }
+               }
         }
         prefetchExtension=0;
     }
@@ -171,7 +171,7 @@
             ", destinations="+destinations.size()+
             ", dispatched="+dispatched.size()+
             ", delivered="+this.prefetchExtension+
-            ", pending="+this.pending.size();
+            ", pending="+getPendingQueueSize();
     }
 
     public String getClientId() {
@@ -186,13 +186,15 @@
      * Release any references that we are holding.
      */
     synchronized public void destroy() {
-        
-        for (Iterator iter = pending.iterator(); iter.hasNext();) {
-            MessageReference node = (MessageReference) iter.next();
-            node.decrementReferenceCount();
-        }
-        pending.clear();
-        
+       
+       synchronized(pending) {
+               for (Iterator iter = pending.iterator(); iter.hasNext();) {
+                   MessageReference node = (MessageReference) iter.next();
+                   node.decrementReferenceCount();
+               }
+               pending.clear();
+       }
+       
         for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
             MessageReference node = (MessageReference) iter.next();
             node.decrementReferenceCount();

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=397985&r1=397984&r2=397985&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Fri Apr 28 12:13:54 2006
@@ -94,7 +94,6 @@
 
     synchronized public void acknowledge(final ConnectionContext context,final 
MessageAck ack) throws Exception{
         // Handle the standard acknowledgment case.
-        boolean wasFull=isFull();
         if(ack.isStandardAck()){
             // Acknowledge all dispatched messages up till the message id of 
the acknowledgment.
             int index=0;
@@ -129,9 +128,7 @@
                             
prefetchExtension=Math.max(prefetchExtension,index+1);
                         else
                             
prefetchExtension=Math.max(0,prefetchExtension-(index+1));
-                        if(wasFull&&!isFull()){
-                            dispatchMatched();
-                        }
+                        dispatchMatched();
                         return;
                     }else{
                         // System.out.println("no match: 
"+ack.getLastMessageId()+","+messageId);
@@ -147,9 +144,7 @@
                 final MessageReference node=(MessageReference) iter.next();
                 if(ack.getLastMessageId().equals(node.getMessageId())){
                     prefetchExtension=Math.max(prefetchExtension,index+1);
-                    if(wasFull&&!isFull()){
-                        dispatchMatched();
-                    }
+                    dispatchMatched();
                     return;
                 }
             }
@@ -176,9 +171,7 @@
                     acknowledge(context,ack,node);
                     if(ack.getLastMessageId().equals(messageId)){
                         
prefetchExtension=Math.max(0,prefetchExtension-(index+1));
-                        if(wasFull&&!isFull()){
-                            dispatchMatched();
-                        }
+                        dispatchMatched();
                         return;
                     }
                 }
@@ -226,8 +219,10 @@
         return (dispatched.size()-prefetchExtension) >= 
(info.getPrefetchSize() *.9);
     }
     
-    synchronized public int getPendingQueueSize(){
-        return pending.size();
+    public int getPendingQueueSize(){
+       synchronized(pending) {
+               return pending.size();
+       }
     }
     
     synchronized public int getDispatchedQueueSize(){
@@ -312,16 +307,13 @@
     }
 
     synchronized protected void onDispatch(final MessageReference node,final 
Message message){
-        boolean wasFull=isFull();
         if(node.getRegionDestination()!=null){
             
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
             context.getConnection().getStatistics().onMessageDequeue(message);
-            if(wasFull&&!isFull()){
-                try{
-                    dispatchMatched();
-                }catch(IOException e){
-                    context.getConnection().serviceException(e);
-                }
+            try{
+                dispatchMatched();
+            }catch(IOException e){
+                context.getConnection().serviceException(e);
             }
         }
     }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=397985&r1=397984&r2=397985&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
 Fri Apr 28 12:13:54 2006
@@ -47,7 +47,7 @@
             ", destinations="+destinations.size()+
             ", dispatched="+dispatched.size()+
             ", delivered="+this.prefetchExtension+
-            ", pending="+this.pending.size();
+            ", pending="+getPendingQueueSize();
     }
 
     public void browseDone() throws Exception {

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=397985&r1=397984&r2=397985&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
 Fri Apr 28 12:13:54 2006
@@ -125,7 +125,7 @@
             ", destinations="+destinations.size()+
             ", dispatched="+dispatched.size()+
             ", delivered="+this.prefetchExtension+
-            ", pending="+this.pending.size();
+            ", pending="+getPendingQueueSize();
     }
 
     public int getLockPriority() {


Reply via email to