Author: chirino
Date: Mon Apr 17 08:32:28 2006
New Revision: 394707

URL: http://svn.apache.org/viewcvs?rev=394707&view=rev
Log:
If a topic consumer was hung up, it would eventually stop the producers since 
the broker memory limit would be reached.
The problem was if the consumer was killed, the broker memory would not get 
freed up and so the producer would remain blocked.
When a subscription is removed, the memory of the pending messages are now 
released.

Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 Mon Apr 17 08:32:28 2006
@@ -180,9 +180,11 @@
         }
         
         destroySubscription(sub);
+        
     }
 
     protected void destroySubscription(Subscription sub) {        
+        sub.destroy();
     }
 
     public void removeSubscription(ConnectionContext context, 
RemoveSubscriptionInfo info) throws Exception {

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
 Mon Apr 17 08:32:28 2006
@@ -181,5 +181,24 @@
     public SubscriptionKey getSubscriptionKey() {
         return subscriptionKey;
     }
+    
+    /**
+     * Release any references that we are holding.
+     */
+    synchronized public void destroy() {
+        
+        for (Iterator iter = pending.iterator(); iter.hasNext();) {
+            MessageReference node = (MessageReference) iter.next();
+            node.decrementReferenceCount();
+        }
+        pending.clear();
+        
+        for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
+            MessageReference node = (MessageReference) iter.next();
+            node.decrementReferenceCount();
+        }
+        dispatched.clear();
+        
+    }
 
 }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Mon Apr 17 08:32:28 2006
@@ -372,5 +372,4 @@
     protected void acknowledge(ConnectionContext context,final MessageAck 
ack,final MessageReference node)
                     throws IOException{}
 
-
 }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
 Mon Apr 17 08:32:28 2006
@@ -28,6 +28,7 @@
 import javax.jms.InvalidSelectorException;
 
 import java.io.IOException;
+import java.util.Iterator;
 
 public class QueueSubscription extends PrefetchSubscription implements 
LockOwner {
     
@@ -184,4 +185,9 @@
         }
     }
     
+    /**
+     */
+    synchronized public void destroy() {        
+    }
+
 }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
 Mon Apr 17 08:32:28 2006
@@ -170,4 +170,10 @@
      *
      */
     public void optimizePrefetch();
+    
+    /**
+     * Called when the subscription is destroyed.
+     */
+    public void destroy();
+
 }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=394707&r1=394706&r2=394707&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 Mon Apr 17 08:32:28 2006
@@ -53,7 +53,8 @@
     private MessageEvictionStrategy messageEvictionStrategy = new 
OldestMessageEvictionStrategy();
     private int discarded = 0;
     private final Object matchedListMutex=new Object();
-    long enqueueCounter;
+    private final AtomicLong enqueueCounter = new AtomicLong(0);
+    private final AtomicLong dequeueCounter = new AtomicLong(0);
     
     public TopicSubscription(Broker broker,ConnectionContext 
context,ConsumerInfo info,UsageManager usageManager)
                     throws InvalidSelectorException{
@@ -62,8 +63,10 @@
     }
 
     public void add(MessageReference node) throws 
InterruptedException,IOException{
-        enqueueCounter++;
+        
+        enqueueCounter.incrementAndGet();        
         node.incrementReferenceCount();
+        
         if(!isFull()&&!isSlaveBroker()){
             optimizePrefetch();
             // if maximumPendingMessages is set we will only discard messages 
which
@@ -131,6 +134,7 @@
     }
 
     synchronized public void acknowledge(final ConnectionContext context,final 
MessageAck ack) throws Exception{
+        
         // Handle the standard acknowledgment case.
         boolean wasFull=isFull();
         if(ack.isStandardAck()||ack.isPoisonAck()){
@@ -138,11 +142,13 @@
                 delivered.addAndGet(ack.getMessageCount());
                 context.getTransaction().addSynchronization(new 
Synchronization(){
                     public void afterCommit() throws Exception{
+                        dequeueCounter.addAndGet(ack.getMessageCount());
                         dispatched.addAndGet(-ack.getMessageCount());
                         
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
                     }
                 });
             }else{
+                dequeueCounter.addAndGet(ack.getMessageCount());
                 dispatched.addAndGet(-ack.getMessageCount());
                 
delivered.set(Math.max(0,delivered.get()-ack.getMessageCount()));
             }
@@ -178,14 +184,13 @@
        }
 
        public long getEnqueueCounter() {
-               return enqueueCounter;
+               return enqueueCounter.get();
        }
+    
     public long getDequeueCounter(){
-        return delivered.get();
+        return dequeueCounter.get();
     }
 
-
-
     /**
      * @return the number of messages discarded due to being a slow consumer
      */
@@ -313,6 +318,16 @@
     public String toString(){
         return "TopicSubscription:"+" consumer="+info.getConsumerId()+", 
destinations="+destinations.size()
                         +", dispatched="+getDispatchedQueueSize()+", 
delivered="+getDequeueCounter()+", matched="+matched()+", 
discarded="+discarded();
+    }
+
+    public void destroy() {
+        synchronized(matchedListMutex){
+            for (Iterator iter = matched.iterator(); iter.hasNext();) {
+                MessageReference node = (MessageReference) iter.next();
+                node.decrementReferenceCount();
+            }
+            matched.clear();
+        }
     }
 
 }


Reply via email to