Author: gtully
Date: Thu Nov 19 10:48:16 2009
New Revision: 882100
URL: http://svn.apache.org/viewvc?rev=882100&view=rev
Log:
svn merge -c 882096 - resolve
https://issues.apache.org/activemq/browse/AMQ-2487 - usage management of
storecursor iterator was broken in that a browse would decrement the usage.
memory management across move and retry operations is now correct. modified
some tests to validate memory usage
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=882100&r1=882099&r2=882100&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Thu Nov 19 10:48:16 2009
@@ -723,11 +723,7 @@
.getDestination());
if
(context.getBroker()==null) {
context.setBroker(getRoot());
- }
-
- // Clear out the memory
usage for the old queue.
- // We'll reset it to
the DLQ below:
-
message.setMemoryUsage(null);
+ }
BrokerSupport.resendNoCopy(context,message,
deadLetterDestination);
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=882100&r1=882099&r2=882100&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
Thu Nov 19 10:48:16 2009
@@ -43,6 +43,7 @@
protected ActiveMQMessageAudit audit;
protected boolean useCache=true;
private boolean started=false;
+ protected MessageReference last = null;
public synchronized void start() throws Exception {
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=882100&r1=882099&r2=882100&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Thu Nov 19 10:48:16 2009
@@ -113,6 +113,7 @@
private synchronized void clearIterator(boolean ensureIterator) {
boolean haveIterator = this.iterator != null;
this.iterator=null;
+ last = null;
if(haveIterator&&ensureIterator) {
ensureIterator();
}
@@ -142,11 +143,11 @@
}
public final synchronized MessageReference next() {
- Message result = null;
+ MessageReference result = null;
if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
result = this.iterator.next().getValue();
- result.decrementReferenceCount();
}
+ last = result;
return result;
}
@@ -182,6 +183,9 @@
if (iterator!=null) {
iterator.remove();
}
+ if (last != null) {
+ last.decrementReferenceCount();
+ }
if (size==0 && isStarted() && useCache && hasSpace() && getStoreSize()
== 0) {
if (LOG.isDebugEnabled()) {
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + "
enabling cache on last remove");
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=882100&r1=882099&r2=882100&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Thu Nov 19 10:48:16 2009
@@ -58,8 +58,6 @@
private boolean iterating;
private boolean flushRequired;
private AtomicBoolean started = new AtomicBoolean();
- private MessageReference last = null;
-
/**
* @param name
* @param store
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=882100&r1=882099&r2=882100&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Thu Nov 19 10:48:16 2009
@@ -35,8 +35,6 @@
public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
private LinkedList<MessageReference> list = new
LinkedList<MessageReference>();
private Iterator<MessageReference> iter;
- private MessageReference last;
-
public VMPendingMessageCursor(){
this.useCache=false;
}
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java?rev=882100&r1=882099&r2=882100&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/util/BrokerSupport.java
Thu Nov 19 10:48:16 2009
@@ -52,6 +52,7 @@
message.setOriginalTransactionId(message.getTransactionId());
message.setDestination(deadLetterDestination);
message.setTransactionId(null);
+ message.setMemoryUsage(null);
boolean originalFlowControl = context.isProducerFlowControl();
try {
context.setProducerFlowControl(false);
Modified:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL:
http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=882100&r1=882099&r2=882100&view=diff
==============================================================================
---
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
(original)
+++
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Thu Nov 19 10:48:16 2009
@@ -40,6 +40,8 @@
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.BaseDestination;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.commons.logging.Log;
@@ -120,6 +122,7 @@
messageIDs[i] = messageID;
}
+ assertTrue("dest has some memory usage", queue.getMemoryPercentUsage()
> 0);
echo("About to move " + messageCount + " messages");
@@ -138,11 +141,15 @@
echo("Now browsing the second queue");
queueViewMBeanName = assertRegisteredObjectName(domain +
":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
- queue =
(QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
+ QueueViewMBean queueNew =
(QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
queueViewMBeanName, QueueViewMBean.class, true);
- long newQueuesize = queue.getQueueSize();
+ long newQueuesize = queueNew.getQueueSize();
echo("Second queue size: " + newQueuesize);
assertEquals("Unexpected number of messages ",messageCount,
newQueuesize);
+
+ // check memory usage migration
+ assertTrue("new dest has some memory usage",
queueNew.getMemoryPercentUsage() > 0);
+ assertEquals("old dest has no memory usage", 0,
queue.getMemoryPercentUsage());
}
public void testRetryMessages() throws Exception {
@@ -164,7 +171,7 @@
long initialQueueSize = queue.getQueueSize();
echo("current queue size: " + initialQueueSize);
-
+ assertTrue("dest has some memory usage", queue.getMemoryPercentUsage()
> 0);
// lets create a duff consumer which keeps rolling back...
Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
@@ -203,6 +210,10 @@
messageIDs[i] = messageID;
}
+ int dlqMemUsage = dlq.getMemoryPercentUsage();
+ assertTrue("dlq has some memory usage", dlqMemUsage > 0);
+ assertEquals("dest has no memory usage", 0,
queue.getMemoryPercentUsage());
+
echo("About to retry " + messageCount + " messages");
@@ -223,6 +234,10 @@
assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
assertEquals("queue size", initialQueueSize, queueSize);
assertEquals("browse queue size", initialQueueSize, actualCount);
+
+ assertEquals("dest has some memory usage", dlqMemUsage,
queue.getMemoryPercentUsage());
+ assertEquals("dlq still has memory usage", dlqMemUsage,
dlq.getMemoryPercentUsage());
+
}
public void testMoveMessagesBySelector() throws Exception {
@@ -246,6 +261,7 @@
queue.removeMatchingMessages("counter > 2");
assertEquals("Should have no more messages in the queue: " +
queueViewMBeanName, 0, queue.getQueueSize());
+ assertEquals("dest has no memory usage", 0,
queue.getMemoryPercentUsage());
}
public void testCopyMessagesBySelector() throws Exception {
@@ -272,6 +288,7 @@
queue.removeMatchingMessages("counter > 2");
assertEquals("Should have no more messages in the queue: " +
queueViewMBeanName, 0, queue.getQueueSize());
+ assertEquals("dest has no memory usage", 0,
queue.getMemoryPercentUsage());
}
@@ -528,7 +545,14 @@
answer.setPersistent(false);
answer.setDeleteAllMessagesOnStartup(true);
answer.setUseJmx(true);
- //answer.setEnableStatistics(true);
+
+ // apply memory limit so that %usage is visible
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultEntry = new PolicyEntry();
+ defaultEntry.setMemoryLimit(1024*1024*4);
+ policyMap.setDefaultEntry(defaultEntry);
+ answer.setDestinationPolicy(policyMap);
+
answer.addConnector(bindAddress);
return answer;
}