Repository: activemq Updated Branches: refs/heads/master 41ee3ec8d -> de24980a6
AMQ-5748 - Fixing MessageStore cache This fixes KahaDBStore to properly check for an existing MessageStore in the cache before creating a new one. This will prevent potential issues with metrics. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/de24980a Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/de24980a Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/de24980a Branch: refs/heads/master Commit: de24980a623c864d24cecf9ed852bec38cf09ae3 Parents: 41ee3ec Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Fri Aug 7 17:13:37 2015 +0000 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Fri Aug 7 17:23:25 2015 +0000 ---------------------------------------------------------------------- .../activemq/store/kahadb/KahaDBStore.java | 26 ++++++++++++++++---- .../store/AbstractMessageStoreSizeStatTest.java | 20 +++++++++++++++ 2 files changed, 41 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/de24980a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index bf14d69..84aba07 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -1004,16 +1004,32 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { @Override public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { - MessageStore store = this.transactionStore.proxy(new KahaDBMessageStore(destination)); - storeCache.put(key(convert(destination)), store); + String key = key(convert(destination)); + MessageStore store = storeCache.get(key(convert(destination))); + if (store == null) { + final MessageStore queueStore = this.transactionStore.proxy(new KahaDBMessageStore(destination)); + store = storeCache.putIfAbsent(key, queueStore); + if (store == null) { + store = queueStore; + } + } + return store; } @Override public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { - TopicMessageStore store = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); - storeCache.put(key(convert(destination)), store); - return store; + String key = key(convert(destination)); + MessageStore store = storeCache.get(key(convert(destination))); + if (store == null) { + final TopicMessageStore topicStore = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination)); + store = storeCache.putIfAbsent(key, topicStore); + if (store == null) { + store = topicStore; + } + } + + return (TopicMessageStore) store; } /** http://git-wip-us.apache.org/repos/asf/activemq/blob/de24980a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java index 59ae44b..1b927b4 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.store; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -34,10 +35,14 @@ import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSession; +import javax.management.ObjectName; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; @@ -247,6 +252,12 @@ public abstract class AbstractMessageStoreSizeStatTest { Topic topic = session.createTopic("test.topic"); session.createDurableSubscriber(topic, "sub1"); + // browse the durable sub - this test is to verify that browsing (which calls createTopicMessageStore) + //in KahaDBStore will not create a brand new store (ie uses the cache) If the cache is not used, + //then the statistics won't be updated properly because a new store would overwrite the old store + //which is still in use + ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers(); + try { // publish a bunch of non-persistent messages to fill up the temp // store @@ -256,6 +267,15 @@ public abstract class AbstractMessageStoreSizeStatTest { prod.send(createMessage(session)); } + //verify the view has 200 messages + assertEquals(1, subs.length); + ObjectName subName = subs[0]; + DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) + broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); + CompositeData[] data = sub.browse(); + assertNotNull(data); + assertEquals(200, data.length); + } finally { connection.stop(); }
