Author: gtully
Date: Fri Mar 16 14:55:38 2012
New Revision: 1301565
URL: http://svn.apache.org/viewvc?rev=1301565&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3733: Topic subscriber is assumed to
be slow consumer when prefetch is set to one. Thanks for the great test case.
Fixed up the logic used to determine slowness of a sub to take into account the
pending messages and prefetch. It is now only applicable when prefetch > 1 and
the pending message strategy keeps messages in memory
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1301565&r1=1301564&r2=1301565&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Fri Mar 16 14:55:38 2012
@@ -99,12 +99,14 @@ public class TopicSubscription extends A
dispatch(node);
setSlowConsumer(false);
} else {
- //we are slow
- if(!isSlowConsumer()) {
- LOG.warn(toString() + ": has reached its prefetch limit
without an ack, it appears to be slow");
- setSlowConsumer(true);
- for (Destination dest: destinations) {
- dest.slowConsumer(getContext(), this);
+ if ( info.getPrefetchSize() > 1 && matched.size() >
info.getPrefetchSize() ) {
+ //we are slow
+ if(!isSlowConsumer()) {
+ LOG.warn(toString() + ": has twice its prefetch limit
pending, without an ack; it appears to be slow");
+ setSlowConsumer(true);
+ for (Destination dest: destinations) {
+ dest.slowConsumer(getContext(), this);
+ }
}
}
if (maximumPendingMessages != 0) {
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java?rev=1301565&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
Fri Mar 16 14:55:38 2012
@@ -0,0 +1,111 @@
+package org.apache.activemq.usecases;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.Assert;
+
+
+/**
+ * Checks to see if "slow consumer advisory messages" are generated when
+ * small number of messages (2) are published to a topic which has a
subscriber
+ * with a prefetch of one set.
+ *
+ */
+
+public class TopicSubscriptionSlowConsumerTest extends TestCase {
+
+ private static final String TOPIC_NAME = "slow.consumer";
+ Connection connection;
+ private Session session;
+ private ActiveMQTopic destination;
+ private MessageProducer producer;
+ private MessageConsumer consumer;
+ private BrokerService brokerService;
+
+
+ public void setUp() throws Exception {
+
+ brokerService = createBroker();
+
+ ActiveMQConnectionFactory activeMQConnectionFactory = new
ActiveMQConnectionFactory("vm://localhost");
+
+ activeMQConnectionFactory.setWatchTopicAdvisories(true);
+ connection = activeMQConnectionFactory.createConnection();
+ session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ destination = new ActiveMQTopic(TOPIC_NAME);
+ producer = session.createProducer(destination);
+
+ connection.start();
+ }
+
+
+
+ public void testPrefetchValueOne() throws Exception{
+
+ ActiveMQTopic consumerDestination = new
ActiveMQTopic(TOPIC_NAME+"?consumer.prefetchSize=1");
+ consumer = session.createConsumer(consumerDestination);
+
+ //add a consumer to the slow consumer advisory topic.
+ ActiveMQTopic slowConsumerAdvisoryTopic =
AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
+ MessageConsumer slowConsumerAdvisory =
session.createConsumer(slowConsumerAdvisoryTopic);
+
+ //publish 2 messages
+ Message txtMessage = session.createTextMessage("Sample Text
Message");
+ for(int i= 0; i<2; i++){
+ producer.send(txtMessage);
+ }
+
+ //consume 2 messages
+ for(int i= 0; i<2; i++){
+ Message receivedMsg = consumer.receive(100);
+ Assert.assertNotNull("received msg "+i+" should not be
null",receivedMsg);
+ }
+
+ //check for "slow consumer" advisory message
+ Message slowAdvisoryMessage = slowConsumerAdvisory.receive(100);
+ Assert.assertNull( "should not have received a slow consumer
advisory message",slowAdvisoryMessage);
+
+ }
+
+
+
+ public void tearDown() throws Exception {
+ consumer.close();
+ producer.close();
+ session.close();
+ connection.close();
+ brokerService.stop();
+ }
+
+
+ //helper method to create a broker with slow consumer advisory turned on
+ private BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setBrokerName("localhost");
+ broker.setUseJmx(true);
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.addConnector("vm://localhost");
+
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultEntry = new PolicyEntry();
+ defaultEntry.setAdvisoryForSlowConsumers(true);
+
+ policyMap.setDefaultEntry(defaultEntry);
+
+ broker.setDestinationPolicy(policyMap);
+ broker.start();
+ broker.waitUntilStarted();
+ return broker;
+ }
+
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date