Repository: activemq Updated Branches: refs/heads/trunk c8a5fb769 -> 6aaf859d2
https://issues.apache.org/jira/browse/AMQ-4656 - fix regression for FilePendingDurableSubscriberMessageStoragePolicy Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6aaf859d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6aaf859d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6aaf859d Branch: refs/heads/trunk Commit: 6aaf859d22bd0e3ab27a3e74d7b9402521d10573 Parents: c8a5fb7 Author: gtully <[email protected]> Authored: Tue Mar 11 15:18:08 2014 +0000 Committer: gtully <[email protected]> Committed: Tue Mar 11 15:18:46 2014 +0000 ---------------------------------------------------------------------- .../broker/region/DurableTopicSubscription.java | 3 ++- .../org/apache/activemq/bugs/AMQ4656Test.java | 22 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6aaf859d/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 2fc0b05..8cb6ecc 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -29,6 +29,7 @@ import javax.jms.JMSException; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -160,7 +161,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } synchronized (pendingLock) { - if (!((StoreDurableSubscriberCursor) pending).isStarted() || !keepDurableSubsActive) { + if (!((AbstractPendingMessageCursor) pending).isStarted() || !keepDurableSubsActive) { pending.setSystemUsage(memoryManager); pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark()); pending.setMaxAuditDepth(getMaxAuditDepth()); http://git-wip-us.apache.org/repos/asf/activemq/blob/6aaf859d/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java index cf13b72..fcdf23e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.bugs; +import java.util.Arrays; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -31,12 +32,20 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.BrokerView; import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; +import org.apache.activemq.broker.region.policy.FilePendingDurableSubscriberMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(value = Parameterized.class) public class AMQ4656Test { private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4656Test.class); @@ -45,9 +54,22 @@ public class AMQ4656Test { private String connectionUri; + @Parameterized.Parameter + public PendingDurableSubscriberMessageStoragePolicy pendingDurableSubPolicy; + + @Parameterized.Parameters(name="{0}") + public static Iterable<Object[]> getTestParameters() { + return Arrays.asList(new Object[][]{{new FilePendingDurableSubscriberMessageStoragePolicy()},{new StorePendingDurableSubscriberMessageStoragePolicy()}}); + } + @Before public void setUp() throws Exception { brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setPendingDurableSubscriberPolicy(pendingDurableSubPolicy); + policyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(policyMap); brokerService.setPersistent(false); brokerService.setUseJmx(true); brokerService.setDeleteAllMessagesOnStartup(true);
