Author: gtully
Date: Fri Mar 16 14:55:38 2012
New Revision: 1301565

URL: http://svn.apache.org/viewvc?rev=1301565&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3733: Topic subscriber is assumed to 
be slow consumer when prefetch is set to one. Thanks for the great test case. 
Fixed up the logic used to determine slowness of a sub to take into account the 
pending messages and prefetch. It is now only applicable when prefetch > 1 and 
the pending message strategy keeps messages in memory

Added:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
   (with props)
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1301565&r1=1301564&r2=1301565&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 Fri Mar 16 14:55:38 2012
@@ -99,12 +99,14 @@ public class TopicSubscription extends A
             dispatch(node);
             setSlowConsumer(false);
         } else {
-            //we are slow
-            if(!isSlowConsumer()) {
-                LOG.warn(toString() + ": has reached its prefetch limit 
without an ack, it appears to be slow");
-                setSlowConsumer(true);
-                for (Destination dest: destinations) {
-                    dest.slowConsumer(getContext(), this);
+            if ( info.getPrefetchSize() > 1 && matched.size() > 
info.getPrefetchSize() ) {
+                //we are slow
+                if(!isSlowConsumer()) {
+                    LOG.warn(toString() + ": has twice its prefetch limit 
pending, without an ack; it appears to be slow");
+                    setSlowConsumer(true);
+                    for (Destination dest: destinations) {
+                        dest.slowConsumer(getContext(), this);
+                    }
                 }
             }
             if (maximumPendingMessages != 0) {

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java?rev=1301565&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
 Fri Mar 16 14:55:38 2012
@@ -0,0 +1,111 @@
+package org.apache.activemq.usecases;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.Assert;
+
+
+/**
+ * Checks to see if "slow consumer advisory messages" are generated when 
+ * small number of messages (2) are published to a topic which has a 
subscriber 
+ * with a prefetch of one set.
+ * 
+ */
+
+public class TopicSubscriptionSlowConsumerTest extends TestCase {
+
+       private static final String TOPIC_NAME = "slow.consumer";
+       Connection connection;
+       private Session session;
+       private ActiveMQTopic destination;
+       private MessageProducer producer;
+       private MessageConsumer consumer;
+       private BrokerService brokerService;
+
+       
+       public void setUp() throws Exception {
+
+               brokerService = createBroker();
+               
+               ActiveMQConnectionFactory activeMQConnectionFactory = new 
ActiveMQConnectionFactory("vm://localhost");
+               
+               activeMQConnectionFactory.setWatchTopicAdvisories(true);
+               connection = activeMQConnectionFactory.createConnection();
+               session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+               destination = new ActiveMQTopic(TOPIC_NAME);
+               producer = session.createProducer(destination);
+               
+               connection.start();
+       }
+
+       
+       
+       public void testPrefetchValueOne() throws Exception{
+               
+               ActiveMQTopic consumerDestination = new 
ActiveMQTopic(TOPIC_NAME+"?consumer.prefetchSize=1");
+               consumer = session.createConsumer(consumerDestination);
+               
+               //add a consumer to the slow consumer advisory topic. 
+               ActiveMQTopic slowConsumerAdvisoryTopic = 
AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
+               MessageConsumer slowConsumerAdvisory = 
session.createConsumer(slowConsumerAdvisoryTopic);
+               
+               //publish 2 messages
+               Message txtMessage = session.createTextMessage("Sample Text 
Message");
+               for(int i= 0; i<2; i++){
+                       producer.send(txtMessage);
+               }
+               
+               //consume 2 messages
+               for(int i= 0; i<2; i++){
+                       Message receivedMsg = consumer.receive(100);
+                       Assert.assertNotNull("received msg "+i+" should not be 
null",receivedMsg);
+               }
+
+               //check for "slow consumer" advisory message
+               Message slowAdvisoryMessage = slowConsumerAdvisory.receive(100);
+               Assert.assertNull( "should not have received a slow consumer 
advisory message",slowAdvisoryMessage);
+               
+       }
+
+       
+
+       public void tearDown() throws Exception {
+               consumer.close();
+               producer.close();
+               session.close();
+               connection.close();
+               brokerService.stop();
+       }
+       
+       
+       //helper method to create a broker with slow consumer advisory turned on
+       private BrokerService createBroker() throws Exception {
+               BrokerService broker = new BrokerService();
+               broker.setBrokerName("localhost");
+               broker.setUseJmx(true);
+               broker.setDeleteAllMessagesOnStartup(true);
+               broker.addConnector("vm://localhost");
+
+               PolicyMap policyMap = new PolicyMap();
+               PolicyEntry defaultEntry = new PolicyEntry();
+               defaultEntry.setAdvisoryForSlowConsumers(true);
+
+               policyMap.setDefaultEntry(defaultEntry);
+
+               broker.setDestinationPolicy(policyMap);
+               broker.start();
+               broker.waitUntilStarted();
+               return broker;
+       }
+
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TopicSubscriptionSlowConsumerTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to