Author: rajith
Date: Tue Jan 24 23:40:18 2012
New Revision: 1235556

URL: http://svn.apache.org/viewvc?rev=1235556&view=rev
Log:
QPID-3604 Added a test case to cover this issue.

Modified:
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java?rev=1235556&r1=1235555&r2=1235556&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
 Tue Jan 24 23:40:18 2012
@@ -25,12 +25,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -192,5 +194,42 @@ public class PrefetchBehaviourTest exten
             assertEquals("Consumer A received message with unexpected index", 
i, msgConsumerA.getIntProperty(INDEX));
         }
     }
+
+    /**
+     * Test Goal: Verify if connection stop releases all messages in it's 
prefetch buffer.
+     * Test Strategy: Send 10 messages to a queue. Create a consumer with 
maxprefetch of 5, but never consume them.
+     *                Stop the connection. Create a new connection and a 
consumer with maxprefetch 10 on the same queue.
+     *                Try to receive all 10 messages.
+     */
+    public void testConnectionStop() throws Exception
+    {
+        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, 
"10");
+        Connection con = getConnection();
+        con.start();
+        Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}");
+
+        MessageProducer prod = ssn.createProducer(queue);
+        for (int i=0; i<10;i++)
+        {
+           prod.send(ssn.createTextMessage("Msg" + i));
+        }
+
+        MessageConsumer consumer = ssn.createConsumer(queue);
+        // This is to ensure we get the first client to prefetch.
+        Message msg = consumer.receive(1000);
+        assertNotNull("The first consumer should get one message",msg);
+        con.stop();
+
+        Connection con2 = getConnection();
+        con2.start();
+        Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = ssn2.createConsumer(queue);
+        for (int i=0; i<9;i++)
+        {
+           TextMessage m = (TextMessage)consumer2.receive(1000);
+           assertNotNull("The second consumer should get 9 messages, but 
received only " + i,m);
+        }
+    }
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to