Author: gtully
Date: Thu Nov 19 10:33:41 2009
New Revision: 882096

URL: http://svn.apache.org/viewvc?rev=882096&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2487 - usage management 
of storecursor iterator was broken in that a browse would decrement the usage. 
memory management across move and retry operations is now correct. modified 
some tests to validate memory usage

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 Thu Nov 19 10:33:41 2009
@@ -723,11 +723,7 @@
                                                                        
.getDestination());
                                                        if 
(context.getBroker()==null) {
                                                                
context.setBroker(getRoot());
-                                                       }                       
 
-                                                                               
   
-                                                       // Clear out the memory 
usage for the old queue. 
-                                                       // We'll reset it to 
the DLQ below:
-                                                       
message.setMemoryUsage(null);
+                                                       }
                                                        
BrokerSupport.resendNoCopy(context,message,
                                                                
deadLetterDestination);
                                                }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
 Thu Nov 19 10:33:41 2009
@@ -43,6 +43,7 @@
     protected ActiveMQMessageAudit audit;
     protected boolean useCache=true;
     private boolean started=false;
+    protected MessageReference last = null;
   
 
     public synchronized void start() throws Exception  {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
 Thu Nov 19 10:33:41 2009
@@ -113,6 +113,7 @@
     private synchronized void clearIterator(boolean ensureIterator) {
         boolean haveIterator = this.iterator != null;
         this.iterator=null;
+        last = null;
         if(haveIterator&&ensureIterator) {
             ensureIterator();
         }
@@ -142,11 +143,11 @@
     }
     
     public final synchronized MessageReference next() {
-        Message result = null;
+        MessageReference result = null;
         if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
             result = this.iterator.next().getValue();
-            result.decrementReferenceCount();
         }
+        last = result;
         return result;
     }
     
@@ -182,6 +183,9 @@
         if (iterator!=null) {
             iterator.remove();
         }
+        if (last != null) {
+            last.decrementReferenceCount();
+        }
         if (size==0 && isStarted() && useCache && hasSpace() && getStoreSize() 
== 0) {
             if (LOG.isDebugEnabled()) {
                 
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + " 
enabling cache on last remove");

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
 Thu Nov 19 10:33:41 2009
@@ -58,8 +58,6 @@
     private boolean iterating;
     private boolean flushRequired;
     private AtomicBoolean started = new AtomicBoolean();
-    private MessageReference last = null;
-
     /**
      * @param name
      * @param store

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
 Thu Nov 19 10:33:41 2009
@@ -35,8 +35,6 @@
 public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
     private LinkedList<MessageReference> list = new 
LinkedList<MessageReference>();
     private Iterator<MessageReference> iter;
-    private MessageReference last;
-    
     public VMPendingMessageCursor(){
         this.useCache=false;
     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
 Thu Nov 19 10:33:41 2009
@@ -52,6 +52,7 @@
         message.setOriginalTransactionId(message.getTransactionId());
         message.setDestination(deadLetterDestination);
         message.setTransactionId(null);
+        message.setMemoryUsage(null);
         boolean originalFlowControl = context.isProducerFlowControl();
         try {
             context.setProducerFlowControl(false);

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=882096&r1=882095&r2=882096&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
 Thu Nov 19 10:33:41 2009
@@ -40,6 +40,8 @@
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.commons.logging.Log;
@@ -120,6 +122,7 @@
             messageIDs[i] = messageID;
         }
 
+        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() 
> 0);
 
         echo("About to move " + messageCount + " messages");
 
@@ -138,11 +141,15 @@
         echo("Now browsing the second queue");
 
         queueViewMBeanName = assertRegisteredObjectName(domain + 
":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
-        queue = 
(QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, 
queueViewMBeanName, QueueViewMBean.class, true);
+        QueueViewMBean queueNew = 
(QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, 
queueViewMBeanName, QueueViewMBean.class, true);
 
-        long newQueuesize = queue.getQueueSize();
+        long newQueuesize = queueNew.getQueueSize();
         echo("Second queue size: " + newQueuesize);
         assertEquals("Unexpected number of messages ",messageCount, 
newQueuesize);
+        
+        // check memory usage migration
+        assertTrue("new dest has some memory usage", 
queueNew.getMemoryPercentUsage() > 0);
+        assertEquals("old dest has no memory usage", 0, 
queue.getMemoryPercentUsage());
     }
 
     public void testRetryMessages() throws Exception {
@@ -164,7 +171,7 @@
 
         long initialQueueSize = queue.getQueueSize();
         echo("current queue size: " + initialQueueSize);
-
+        assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() 
> 0);
 
         // lets create a duff consumer which keeps rolling back...
         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
@@ -203,6 +210,10 @@
             messageIDs[i] = messageID;
         }
 
+        int dlqMemUsage = dlq.getMemoryPercentUsage();
+        assertTrue("dlq has some memory usage", dlqMemUsage > 0);
+        assertEquals("dest has no memory usage", 0, 
queue.getMemoryPercentUsage());
+        
 
         echo("About to retry " + messageCount + " messages");
 
@@ -223,6 +234,10 @@
         assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
         assertEquals("queue size", initialQueueSize, queueSize);
         assertEquals("browse queue size", initialQueueSize, actualCount);
+        
+        assertEquals("dest has some memory usage", dlqMemUsage, 
queue.getMemoryPercentUsage());
+        assertEquals("dlq still has memory usage", dlqMemUsage, 
dlq.getMemoryPercentUsage());
+        
     }
 
     public void testMoveMessagesBySelector() throws Exception {
@@ -246,6 +261,7 @@
         queue.removeMatchingMessages("counter > 2");
 
         assertEquals("Should have no more messages in the queue: " + 
queueViewMBeanName, 0, queue.getQueueSize());
+        assertEquals("dest has no memory usage", 0, 
queue.getMemoryPercentUsage());
     }
 
     public void testCopyMessagesBySelector() throws Exception {
@@ -272,6 +288,7 @@
         queue.removeMatchingMessages("counter > 2");
 
         assertEquals("Should have no more messages in the queue: " + 
queueViewMBeanName, 0, queue.getQueueSize());
+        assertEquals("dest has no memory usage", 0, 
queue.getMemoryPercentUsage());
     }
 
 
@@ -528,7 +545,14 @@
         answer.setPersistent(false);
         answer.setDeleteAllMessagesOnStartup(true);
         answer.setUseJmx(true);
-        //answer.setEnableStatistics(true);
+       
+        // apply memory limit so that %usage is visible
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setMemoryLimit(1024*1024*4);
+        policyMap.setDefaultEntry(defaultEntry);
+        answer.setDestinationPolicy(policyMap);
+        
         answer.addConnector(bindAddress);
         return answer;
     }


Reply via email to