Author: gtully
Date: Fri Dec 17 18:33:13 2010
New Revision: 1050463
URL: http://svn.apache.org/viewvc?rev=1050463&view=rev
Log:
resolve: https://issues.apache.org/jira/browse/AMQ-3095 - with test
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=1050463&r1=1050462&r2=1050463&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Fri Dec 17 18:33:13 2010
@@ -209,7 +209,7 @@ public class PolicyEntry extends Destina
int prefetch = sub.getPrefetchSize();
sub.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
//override prefetch size if not set by the Consumer
- if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH){
+ if (prefetch == ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH
|| prefetch == ActiveMQPrefetchPolicy.DEFAULT_OPTIMIZE_DURABLE_TOPIC_PREFETCH){
sub.setPrefetchSize(getDurableTopicPrefetch());
}
if (pendingDurableSubscriberPolicy != null) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java?rev=1050463&r1=1050462&r2=1050463&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
Fri Dec 17 18:33:13 2010
@@ -42,11 +42,15 @@ import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
@@ -297,6 +301,30 @@ public class DurableConsumerTest extends
public void testConsumer() throws Exception{
doTestConsumer(false);
}
+
+ public void testPrefetchViaBrokerConfig() throws Exception {
+
+ Integer prefetchVal = new Integer(150);
+ PolicyEntry policyEntry = new PolicyEntry();
+ policyEntry.setDurableTopicPrefetch(prefetchVal.intValue());
+ policyEntry.setPrioritizedMessages(true);
+ PolicyMap policyMap = new PolicyMap();
+ policyMap.setDefaultEntry(policyEntry);
+ broker.setDestinationPolicy(policyMap);
+ broker.start();
+
+ factory = createConnectionFactory();
+ Connection consumerConnection = factory.createConnection();
+ consumerConnection.setClientID(CONSUMER_NAME);
+ Session consumerSession = consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Topic topic = consumerSession.createTopic(getClass().getName());
+ MessageConsumer consumer =
consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
+ consumerConnection.start();
+
+ ObjectName activeSubscriptionObjectName =
broker.getAdminView().getDurableTopicSubscribers()[0];
+ Object prefetchFromSubView =
broker.getManagementContext().getAttribute(activeSubscriptionObjectName,
"PrefetchSize");
+ assertEquals(prefetchVal, prefetchFromSubView);
+ }
public void doTestConsumer(boolean forceRecover) throws Exception{
@@ -407,7 +435,6 @@ public class DurableConsumerTest extends
answer.setPersistenceAdapter(kaha);
answer.addConnector(bindAddress);
answer.setUseShutdownHook(false);
- answer.setUseJmx(false);
answer.setAdvisorySupport(false);
answer.setDedicatedTaskRunner(useDedicatedTaskRunner);
}