Repository: activemq Updated Branches: refs/heads/master a3c8bee1f -> a49d46e3c
https://issues.apache.org/jira/browse/AMQ-5748 Updating MemoryTopicMessageStore to decrement store statistics on cache eviction. Updating KahaDBMessageStoreSizeStatTest to account for the fact that a LRU cache is used so the last 100 messages are kept in memroy. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a49d46e3 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a49d46e3 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a49d46e3 Branch: refs/heads/master Commit: a49d46e3ca689af6f2cb721c457be97d654b2492 Parents: a3c8bee Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Sat Aug 8 17:55:41 2015 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Sat Aug 8 17:59:19 2015 +0000 ---------------------------------------------------------------------- .../store/memory/MemoryMessageStore.java | 16 +- .../store/memory/MemoryTopicMessageStore.java | 56 ++++++- .../store/AbstractMessageStoreSizeStatTest.java | 150 ++++++++++++++----- .../kahadb/KahaDBMessageStoreSizeStatTest.java | 4 +- .../MultiKahaDBMessageStoreSizeStatTest.java | 12 +- .../memory/MemoryMessageStoreSizeStatTest.java | 57 ++++++- 6 files changed, 242 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index 51006c2..3989646 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -57,8 +57,7 @@ public class MemoryMessageStore extends AbstractMessageStore { public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { synchronized (messageTable) { messageTable.put(message.getMessageId(), message); - getMessageStoreStatistics().getMessageCount().increment(); - getMessageStoreStatistics().getMessageSize().addSize(message.getSize()); + incMessageStoreStatistics(message); } message.incrementReferenceCount(); message.getMessageId().setFutureOrSequenceLong(sequenceId++); @@ -94,8 +93,7 @@ public class MemoryMessageStore extends AbstractMessageStore { Message removed = messageTable.remove(msgId); if( removed !=null ) { removed.decrementReferenceCount(); - getMessageStoreStatistics().getMessageCount().decrement(); - getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize()); + decMessageStoreStatistics(removed); } if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) { lastBatchId = null; @@ -200,4 +198,14 @@ public class MemoryMessageStore extends AbstractMessageStore { } } + protected final void incMessageStoreStatistics(Message message) { + getMessageStoreStatistics().getMessageCount().increment(); + getMessageStoreStatistics().getMessageSize().addSize(message.getSize()); + } + + protected final void decMessageStoreStatistics(Message message) { + getMessageStoreStatistics().getMessageCount().decrement(); + getMessageStoreStatistics().getMessageSize().addSize(-message.getSize()); + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java index 0debfe6..142547f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; + import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -29,36 +30,47 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.MessageStoreStatistics; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.util.LRUCache; import org.apache.activemq.util.SubscriptionKey; /** - * + * */ public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore { private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase; private Map<SubscriptionKey, MemoryTopicSub> topicSubMap; + private final Map<MessageId, Message> originalMessageTable; public MemoryTopicMessageStore(ActiveMQDestination destination) { - this(destination, new LRUCache<MessageId, Message>(100, 100, 0.75f, false), makeSubscriptionInfoMap()); + this(destination, new MemoryTopicMessageStoreLRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap()); + + //Set the messageStoreStatistics after the super class is initialized so that the stats can be + //properly updated on cache eviction + MemoryTopicMessageStoreLRUCache cache = (MemoryTopicMessageStoreLRUCache) originalMessageTable; + cache.setMessageStoreStatistics(messageStoreStatistics); } public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) { super(destination, messageTable); this.subscriberDatabase = subscriberDatabase; this.topicSubMap = makeSubMap(); + //this is only necessary so that messageStoreStatistics can be set if necessary + //We need the original reference since messageTable is wrapped in a synchronized map in the parent class + this.originalMessageTable = messageTable; } protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() { return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>()); } - + protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() { return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>()); } + @Override public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { super.addMessage(context, message); for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) { @@ -67,6 +79,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic } } + @Override public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); @@ -76,10 +89,12 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic } } + @Override public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName)); } + @Override public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive) throws IOException { SubscriptionKey key = new SubscriptionKey(info); MemoryTopicSub sub = new MemoryTopicSub(); @@ -93,12 +108,14 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic subscriberDatabase.put(key, info); } + @Override public synchronized void deleteSubscription(String clientId, String subscriptionName) { org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName); subscriberDatabase.remove(key); topicSubMap.remove(key); } + @Override public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception { MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); if (sub != null) { @@ -106,16 +123,19 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic } } + @Override public synchronized void delete() { super.delete(); subscriberDatabase.clear(); topicSubMap.clear(); } + @Override public SubscriptionInfo[] getAllSubscriptions() throws IOException { return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]); } + @Override public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException { int result = 0; MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName)); @@ -125,6 +145,7 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic return result; } + @Override public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception { MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); if (sub != null) { @@ -132,10 +153,39 @@ public class MemoryTopicMessageStore extends MemoryMessageStore implements Topic } } + @Override public void resetBatching(String clientId, String subscriptionName) { MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName)); if (sub != null) { sub.resetBatching(); } } + + /** + * Since we initialize the store with a LRUCache in some cases, we need to account for cache evictions + * when computing the message store statistics. + * + */ + private static class MemoryTopicMessageStoreLRUCache extends LRUCache<MessageId, Message> { + private static final long serialVersionUID = -342098639681884413L; + private MessageStoreStatistics messageStoreStatistics; + + public MemoryTopicMessageStoreLRUCache(int initialCapacity, int maximumCacheSize, + float loadFactor, boolean accessOrder) { + super(initialCapacity, maximumCacheSize, loadFactor, accessOrder); + } + + public void setMessageStoreStatistics( + MessageStoreStatistics messageStoreStatistics) { + this.messageStoreStatistics = messageStoreStatistics; + } + + @Override + protected void onCacheEviction(Map.Entry<MessageId, Message> eldest) { + if (messageStoreStatistics != null) { + messageStoreStatistics.getMessageCount().decrement(); + messageStoreStatistics.getMessageSize().addSize(-eldest.getValue().getSize()); + } + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java index 1b927b4..944d183 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java @@ -35,6 +35,7 @@ import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; import javax.management.ObjectName; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; @@ -47,6 +48,8 @@ import org.apache.activemq.broker.region.Destination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -67,6 +70,7 @@ public abstract class AbstractMessageStoreSizeStatTest { protected BrokerService broker; protected URI brokerConnectURI; protected String defaultQueueName = "test.queue"; + protected String defaultTopicName = "test.topic"; protected static int messageSize = 1000; @Before @@ -100,34 +104,67 @@ public abstract class AbstractMessageStoreSizeStatTest { @Test public void testMessageSize() throws Exception { - Destination dest = publishTestMessages(200); + Destination dest = publishTestQueueMessages(200); verifyStats(dest, 200, 200 * messageSize); } @Test public void testMessageSizeAfterConsumption() throws Exception { - Destination dest = publishTestMessages(200); + Destination dest = publishTestQueueMessages(200); verifyStats(dest, 200, 200 * messageSize); - consumeTestMessages(); - Thread.sleep(3000); + consumeTestQueueMessages(); + verifyStats(dest, 0, 0); } @Test - public void testMessageSizeDurable() throws Exception { + public void testMessageSizeOneDurable() throws Exception { + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); - Destination dest = publishTestMessagesDurable(); + Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 200); //verify the count and size verifyStats(dest, 200, 200 * messageSize); + //consume all messages + consumeDurableTestMessages(connection, "sub1", 200); + + //All messages should now be gone + verifyStats(dest, 0, 0); + + connection.close(); + } + + @Test(timeout=10000) + public void testMessageSizeTwoDurables() throws Exception { + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + + Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 200); + + //verify the count and size + verifyStats(dest, 200, 200 * messageSize); + + //consume messages just for sub1 + consumeDurableTestMessages(connection, "sub1", 200); + + //There is still a durable that hasn't consumed so the messages should exist + verifyStats(dest, 200, 200 * messageSize); + + connection.stop(); + } @Test public void testMessageSizeAfterDestinationDeletion() throws Exception { - Destination dest = publishTestMessages(200); + Destination dest = publishTestQueueMessages(200); verifyStats(dest, 200, 200 * messageSize); //check that the size is 0 after deletion @@ -135,18 +172,34 @@ public abstract class AbstractMessageStoreSizeStatTest { verifyStats(dest, 0, 0); } - protected void verifyStats(Destination dest, int count, long minimumSize) throws Exception { - MessageStore messageStore = dest.getMessageStore(); - MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics(); - assertEquals(messageStore.getMessageCount(), count); - assertEquals(messageStore.getMessageCount(), - storeStats.getMessageCount().getCount()); - assertEquals(messageStore.getMessageSize(), + protected void verifyStats(Destination dest, final int count, final long minimumSize) throws Exception { + final MessageStore messageStore = dest.getMessageStore(); + final MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics(); + + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return (count == messageStore.getMessageCount()) && (messageStore.getMessageCount() == + storeStats.getMessageCount().getCount()) && (messageStore.getMessageSize() == messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize()); + } + }); + if (count > 0) { assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize); + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return storeStats.getMessageSize().getTotalSize() > minimumSize; + } + }); } else { - assertEquals(storeStats.getMessageSize().getTotalSize(), 0); + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return storeStats.getMessageSize().getTotalSize() == 0; + } + }); } } @@ -166,11 +219,11 @@ public abstract class AbstractMessageStoreSizeStatTest { } - protected Destination publishTestMessages(int count) throws Exception { - return publishTestMessages(count, defaultQueueName); + protected Destination publishTestQueueMessages(int count) throws Exception { + return publishTestQueueMessages(count, defaultQueueName); } - protected Destination publishTestMessages(int count, String queueName) throws Exception { + protected Destination publishTestQueueMessages(int count, String queueName) throws Exception { // create a new queue final ActiveMQDestination activeMqQueue = new ActiveMQQueue( queueName); @@ -196,17 +249,21 @@ public abstract class AbstractMessageStoreSizeStatTest { } } finally { - connection.stop(); + connection.close(); } return dest; } - protected Destination consumeTestMessages() throws Exception { - return consumeTestMessages(defaultQueueName); + protected Destination consumeTestQueueMessages() throws Exception { + return consumeTestQueueMessages(defaultQueueName); + } + + protected Destination consumeDurableTestMessages(Connection connection, String sub, int size) throws Exception { + return consumeDurableTestMessages(connection, sub, size, defaultTopicName); } - protected Destination consumeTestMessages(String queueName) throws Exception { + protected Destination consumeTestQueueMessages(String queueName) throws Exception { // create a new queue final ActiveMQDestination activeMqQueue = new ActiveMQQueue( queueName); @@ -235,22 +292,45 @@ public abstract class AbstractMessageStoreSizeStatTest { return dest; } - protected Destination publishTestMessagesDurable() throws Exception { + protected Destination consumeDurableTestMessages(Connection connection, String sub, int size, String topicName) throws Exception { // create a new queue final ActiveMQDestination activeMqTopic = new ActiveMQTopic( - "test.topic"); + topicName); + + Destination dest = broker.getDestination(activeMqTopic); + + Session session = connection.createSession(false, + QueueSession.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(topicName); + + try { + TopicSubscriber consumer = session.createDurableSubscriber(topic, sub); + for (int i = 0; i < size; i++) { + consumer.receive(); + } + + } finally { + session.close(); + } + + return dest; + } + + protected Destination publishTestMessagesDurable(Connection connection, String[] subNames, int publishSize, int expectedSize) throws Exception { + // create a new queue + final ActiveMQDestination activeMqTopic = new ActiveMQTopic( + defaultTopicName); Destination dest = broker.getDestination(activeMqTopic); // Start the connection - Connection connection = new ActiveMQConnectionFactory(brokerConnectURI) - .createConnection(); - connection.setClientID("clientId"); - connection.start(); + Session session = connection.createSession(false, TopicSession.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic("test.topic"); - session.createDurableSubscriber(topic, "sub1"); + Topic topic = session.createTopic(defaultTopicName); + for (String subName : subNames) { + session.createDurableSubscriber(topic, subName); + } // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore) //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used, @@ -263,21 +343,21 @@ public abstract class AbstractMessageStoreSizeStatTest { // store MessageProducer prod = session.createProducer(topic); prod.setDeliveryMode(DeliveryMode.PERSISTENT); - for (int i = 0; i < 200; i++) { + for (int i = 0; i < publishSize; i++) { prod.send(createMessage(session)); } - //verify the view has 200 messages - assertEquals(1, subs.length); + //verify the view has expected messages + assertEquals(subNames.length, subs.length); ObjectName subName = subs[0]; DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); CompositeData[] data = sub.browse(); assertNotNull(data); - assertEquals(200, data.length); + assertEquals(expectedSize, data.length); } finally { - connection.stop(); + session.close(); } return dest; http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java index bb46f20..28884e6 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java @@ -64,7 +64,7 @@ public class KahaDBMessageStoreSizeStatTest extends @Test public void testMessageSizeAfterRestartAndPublish() throws Exception { - Destination dest = publishTestMessages(200); + Destination dest = publishTestQueueMessages(200); // verify the count and size verifyStats(dest, 200, 200 * messageSize); @@ -72,7 +72,7 @@ public class KahaDBMessageStoreSizeStatTest extends // stop, restart broker and publish more messages stopBroker(); this.setUpBroker(false); - dest = publishTestMessages(200); + dest = publishTestQueueMessages(200); // verify the count and size verifyStats(dest, 400, 400 * messageSize); http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java index 4342e1d..849a91b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java @@ -84,7 +84,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends @Test public void testMessageSizeAfterRestartAndPublish() throws Exception { - Destination dest = publishTestMessages(200); + Destination dest = publishTestQueueMessages(200); // verify the count and size verifyStats(dest, 200, 200 * messageSize); @@ -92,7 +92,7 @@ public class MultiKahaDBMessageStoreSizeStatTest extends // stop, restart broker and publish more messages stopBroker(); this.setUpBroker(false); - dest = publishTestMessages(200); + dest = publishTestQueueMessages(200); // verify the count and size verifyStats(dest, 400, 400 * messageSize); @@ -102,13 +102,13 @@ public class MultiKahaDBMessageStoreSizeStatTest extends @Test public void testMessageSizeAfterRestartAndPublishMultiQueue() throws Exception { - Destination dest = publishTestMessages(200); + Destination dest = publishTestQueueMessages(200); // verify the count and size verifyStats(dest, 200, 200 * messageSize); assertTrue(broker.getPersistenceAdapter().size() > 200 * messageSize); - Destination dest2 = publishTestMessages(200, "test.queue2"); + Destination dest2 = publishTestQueueMessages(200, "test.queue2"); // verify the count and size verifyStats(dest2, 200, 200 * messageSize); @@ -117,8 +117,8 @@ public class MultiKahaDBMessageStoreSizeStatTest extends // stop, restart broker and publish more messages stopBroker(); this.setUpBroker(false); - dest = publishTestMessages(200); - dest2 = publishTestMessages(200, "test.queue2"); + dest = publishTestQueueMessages(200); + dest2 = publishTestQueueMessages(200, "test.queue2"); // verify the count and size after publishing messages verifyStats(dest, 400, 400 * messageSize); http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java index 755936c..dc6ff8b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java @@ -18,16 +18,21 @@ package org.apache.activemq.store.memory; import java.io.IOException; +import javax.jms.Connection; + +import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.store.AbstractMessageStoreSizeStatTest; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This test checks that KahaDB properly sets the new storeMessageSize statistic. - * + * * AMQ-5748 - * + * */ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStatTest { protected static final Logger LOG = LoggerFactory @@ -39,7 +44,53 @@ public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStat broker.setPersistenceAdapter(new MemoryPersistenceAdapter()); } + @Override + @Test(timeout=10000) + public void testMessageSizeOneDurable() throws Exception { + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + + //The expected value is only 100 because for durables a LRUCache is being used + //with a max size of 100 + Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1"}, 200, 100); + + //verify the count and size, should be 100 because of the LRUCache + verifyStats(dest, 100, 100 * messageSize); + + consumeDurableTestMessages(connection, "sub1", 100); + + //Since an LRU cache is used and messages are kept in memory, this should be 100 still + verifyStats(dest, 100, 100 * messageSize); + + connection.stop(); + + } + + @Override + @Test(timeout=10000) + public void testMessageSizeTwoDurables() throws Exception { + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + + //The expected value is only 100 because for durables a LRUCache is being used + //with a max size of 100, so only 100 messages are kept + Destination dest = publishTestMessagesDurable(connection, new String[] {"sub1", "sub2"}, 200, 100); + + //verify the count and size + verifyStats(dest, 100, 100 * messageSize); + + //consume for sub1 + consumeDurableTestMessages(connection, "sub1", 100); + + //Should be 100 messages still + verifyStats(dest, 100, 100 * messageSize); + + connection.stop(); + + } + - }