Author: tabish
Date: Wed May 23 20:27:23 2012
New Revision: 1342028
URL: http://svn.apache.org/viewvc?rev=1342028&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3669
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java?rev=1342028&r1=1342027&r2=1342028&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/MirroredQueue.java
Wed May 23 20:27:23 2012
@@ -55,6 +55,7 @@ public class MirroredQueue implements De
message = message.copy();
}
message.setDestination(destination.getActiveMQDestination());
+ message.setMemoryUsage(null); // set this to
null so that it will use the queue memoryUsage instance instead of the topic.
super.send(context, message);
}
};
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java?rev=1342028&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
Wed May 23 20:27:23 2012
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.virtual.MirroredQueue;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.usage.MemoryUsage;
+import org.apache.activemq.usage.StoreUsage;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.usage.TempUsage;
+import org.apache.activemq.util.IOHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+/**
+ * This test will determine that the producer flow control does not kick in.
+ * The original MirroredQueue implementation was causing the queue to update
+ * the topic memory usage instead of the queue memory usage.
+ * The reason is that the message memory usage instance will not be updated
+ * unless it is null. This was the case when the message was initially sent
+ * to the topic but then it was non-null when it was being sent to the queue.
+ * When the region destination was set, the associated memory usage was not
+ * updated to the passed queue destination and thus the memory usage of the
+ * topic was being updated instead.
+ *
+ * @author Claudio Corsi
+ */
+public class MirroredQueueCorrectMemoryUsageTest extends
EmbeddedBrokerTestSupport {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MirroredQueueCorrectMemoryUsageTest.class);
+
+ private static final long ONE_MB = 0x0100000;
+ private static final long TEN_MB = ONE_MB * 10;
+ private static final long TWENTY_MB = TEN_MB * 2;
+
+ private static final String CREATED_STATIC_FOR_PERSISTENT =
"created.static.for.persistent";
+
+ @Override
+ protected boolean isPersistent() {
+ return true;
+ }
+
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ // Create the broker service instance....
+ BrokerService broker = super.createBroker();
+ // Create and add the mirrored queue destination interceptor ....
+ DestinationInterceptor[] destinationInterceptors = new
DestinationInterceptor[1];
+ MirroredQueue mq = new MirroredQueue();
+ mq.setCopyMessage(true);
+ mq.setPrefix("");
+ mq.setPostfix(".qmirror");
+ destinationInterceptors[0] = mq;
+ broker.setDestinationInterceptors(destinationInterceptors);
+ // Create the destination policy for the topics and queues
+ PolicyMap policyMap = new PolicyMap();
+ List<PolicyEntry> entries = new LinkedList<PolicyEntry>();
+ // Create Topic policy entry
+ PolicyEntry policyEntry = new PolicyEntry();
+ super.useTopic = true;
+ ActiveMQDestination destination = super.createDestination(">");
+ Assert.isTrue(destination.isTopic(), "Created destination was not a
topic");
+ policyEntry.setDestination(destination);
+ policyEntry.setProducerFlowControl(true);
+ policyEntry.setMemoryLimit(ONE_MB); // x10
+ entries.add(policyEntry);
+ // Create Queue policy entry
+ policyEntry = new PolicyEntry();
+ super.useTopic = false;
+ destination = super.createDestination(CREATED_STATIC_FOR_PERSISTENT);
+ Assert.isTrue(destination.isQueue(), "Created destination was not a
queue");
+ policyEntry.setDestination(destination);
+ policyEntry.setProducerFlowControl(true);
+ policyEntry.setMemoryLimit(TEN_MB);
+ entries.add(policyEntry);
+ policyMap.setPolicyEntries(entries);
+ broker.setDestinationPolicy(policyMap);
+ // Set destinations
+ broker.setDestinations(new ActiveMQDestination[] { destination });
+ // Set system usage
+ SystemUsage memoryManager = new SystemUsage();
+ MemoryUsage memoryUsage = new MemoryUsage();
+ memoryUsage.setLimit(TEN_MB);
+ memoryManager.setMemoryUsage(memoryUsage);
+ StoreUsage storeUsage = new StoreUsage();
+ storeUsage.setLimit(TWENTY_MB);
+ memoryManager.setStoreUsage(storeUsage);
+ TempUsage tempDiskUsage = new TempUsage();
+ tempDiskUsage.setLimit(TEN_MB);
+ memoryManager.setTempUsage(tempDiskUsage);
+ broker.setSystemUsage(memoryManager);
+ // Set the persistent adapter
+ KahaDBPersistenceAdapter persistenceAdapter = new
KahaDBPersistenceAdapter();
+ persistenceAdapter.setJournalMaxFileLength((int)TEN_MB);
+ // Delete all current messages...
+ IOHelper.deleteFile(persistenceAdapter.getDirectory());
+ broker.setPersistenceAdapter(persistenceAdapter);
+ return broker;
+ }
+
+ @Before
+ protected void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @After
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ @Test(timeout=40000)
+ public void testNoMemoryUsageIncreaseForTopic() throws Exception {
+ Connection connection = super.createConnection();
+ connection.start();
+ Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Destination destination =
session.createQueue(CREATED_STATIC_FOR_PERSISTENT);
+ MessageProducer producer = session.createProducer(destination);
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ char[] m = new char[1024];
+ Arrays.fill(m, 'x');
+ // create some messages that have 1k each
+ for (int i = 1; i < 12000; i++) {
+ producer.send(session.createTextMessage(new String(m)));
+ logger.debug("Sent message: " + i);
+ }
+ producer.close();
+ session.close();
+ connection.stop();
+ connection.close();
+ }
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java
------------------------------------------------------------------------------
svn:eol-style = native