Author: rgodfrey
Date: Tue Jan 20 16:47:50 2015
New Revision: 1653293

URL: http://svn.apache.org/r1653293
Log:
QPID-6294 : Fix issue whereby AUTO ACK receive consumer would only ever receive 
one message since _currentPrefetch would never be zeroed

Modified:
    
qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java

Modified: 
qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java?rev=1653293&r1=1653292&r2=1653293&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java
 (original)
+++ 
qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/Hello.java
 Tue Jan 20 16:47:50 2015
@@ -34,6 +34,9 @@ import javax.jms.TextMessage;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.configuration.ClientProperties;
+
 
 public class Hello 
 {
@@ -42,10 +45,38 @@ public class Hello
     {
     }
 
-    public static void main(String[] args) 
+    public static void main(String[] args) throws Exception
     {
-        Hello hello = new Hello();
-        hello.runTest();
+        System.setProperty(ClientProperties.AMQP_VERSION, "0-91");
+        System.setProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "0");
+        System.setProperty(ClientProperties.DEST_SYNTAX, "BURL");
+
+        Connection conn = new AMQConnection("127.0.0.1", 5672, 
"admin","admin", "client", "/");
+
+        conn.start();
+
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue("queue");
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        for(int i = 0 ; i < 2 ; i ++)
+        {
+            TextMessage message = (TextMessage) consumer.receive(1000l);
+            System.out.println(message == null ? "null" : message.getText());
+        }
+        for(int i = 0 ; i < 2 ; i ++)
+        {
+            TextMessage message = session.createTextMessage("Hello " + i);
+            producer.send(message);
+        }
+
+        for(int i = 0 ; i < 2 ; i ++)
+        {
+            TextMessage message = (TextMessage) consumer.receive(1000l);
+            System.out.println(message == null ? "null" : message.getText());
+        }
     }
 
     private void runTest() 

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java?rev=1653293&r1=1653292&r2=1653293&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
 Tue Jan 20 16:47:50 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
 
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.Session;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -182,7 +183,8 @@ public class BasicMessageConsumer_0_8 ex
             {
                 getSession().reduceCreditAfterAcknowledge();
             }
-            if (manageCredit && message != null)
+            if (manageCredit && !(getSession().getAcknowledgeMode() == 
Session.AUTO_ACKNOWLEDGE
+                                  || getSession().getAcknowledgeMode() == 
Session.DUPS_OK_ACKNOWLEDGE) && message != null)
             {
                 getSession().updateCurrentPrefetch(1);
             }
@@ -214,7 +216,8 @@ public class BasicMessageConsumer_0_8 ex
             {
                 getSession().reduceCreditAfterAcknowledge();
             }
-            if (manageCredit && message != null)
+            if (manageCredit && !(getSession().getAcknowledgeMode() == 
Session.AUTO_ACKNOWLEDGE
+                                  || getSession().getAcknowledgeMode() == 
Session.DUPS_OK_ACKNOWLEDGE) && message != null)
             {
                 getSession().updateCurrentPrefetch(1);
             }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to