Author: orudyy
Date: Mon Dec 21 13:33:03 2015
New Revision: 1721151

URL: http://svn.apache.org/viewvc?rev=1721151&view=rev
Log:
QPID-6951: Release consumer prefetched messages on consumer close regardless 
whether session is closed or not. Rename/remove consumer methods to have 
sensible method names indicating what exactly method is really doing. Add 
system test

Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1721151&r1=1721150&r2=1721151&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java 
(original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQSession.java 
Mon Dec 21 13:33:03 2015
@@ -27,7 +27,6 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -939,7 +938,7 @@ public abstract class AMQSession<C exten
     private void rejectPending(C consumer)
     {
         // Reject messages on pre-receive queue
-        consumer.rollbackPendingMessages();
+        consumer.releasePendingMessages();
 
         // Reject messages on pre-dispatch queue
         rejectMessagesForConsumerTag(consumer.getConsumerTag());
@@ -3373,7 +3372,7 @@ public abstract class AMQSession<C exten
                 {
                     if (!consumer.isBrowseOnly())
                     {
-                        consumer.rollback();
+                        consumer.releasePendingMessages();
                     }
                     else
                     {

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1721151&r1=1721150&r2=1721151&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Mon Dec 21 13:33:03 2015
@@ -652,9 +652,9 @@ public abstract class BasicMessageConsum
             }
 
 
-            if(!(isBrowseOnly() || getSession().isClosing()))
+            if(!isBrowseOnly())
             {
-                rollback();
+                releasePendingMessages();
             }
         }
     }
@@ -887,12 +887,7 @@ public abstract class BasicMessageConsum
         return _browseOnly;
     }
 
-    public void rollback()
-    {
-        rollbackPendingMessages();
-    }
-
-    public void rollbackPendingMessages()
+    void releasePendingMessages()
     {
         if (_synchronousQueue.size() > 0)
         {
@@ -942,7 +937,7 @@ public abstract class BasicMessageConsum
             if (_synchronousQueue.size() != 0)
             {
                 _logger.warn("Queue was not empty after rejecting all messages 
Remaining:" + _synchronousQueue.size());
-                rollback();
+                releasePendingMessages();
             }
 
             clearReceiveQueue();

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1721151&r1=1721150&r2=1721151&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Mon Dec 21 13:33:03 2015
@@ -454,7 +454,7 @@ public class BasicMessageConsumer_0_10 e
         return receiveNoWait();
     }
 
-    @Override public void rollbackPendingMessages()
+    @Override void releasePendingMessages()
     {
         if (getSynchronousQueue().size() > 0)
         {

Modified: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java?rev=1721151&r1=1721150&r2=1721151&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
 (original)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
 Mon Dec 21 13:33:03 2015
@@ -108,4 +108,39 @@ public class MessageConsumerCloseTest  e
         assertEquals("Message three has unexpected content", 2, 
msg3.getIntProperty(INDEX));
         session.commit();
     }
+
+    public void 
testMessagesReceivedBeforeConsumerCloseAreRedeliveredAfterRollback() throws 
Exception
+    {
+        Connection connection = getConnection();
+        final Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+
+        Destination destination = getTestQueue();
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        int messageNumber = 4;
+        connection.start();
+        sendMessage(session, destination, messageNumber);
+
+        for(int i = 0; i < messageNumber/2 ; i++)
+        {
+            Message message = consumer.receive(RECEIVE_TIMEOUT);
+            assertNotNull("Message [" + i +"] was null", message);
+            assertEquals("Message [" + i +"] has unexpected content", i, 
message.getIntProperty(INDEX));
+        }
+
+        consumer.close();
+
+        session.rollback();
+
+        MessageConsumer consumer2 = session.createConsumer(destination);
+
+        for(int i = 0; i < messageNumber ; i++)
+        {
+            Message message = consumer2.receive(RECEIVE_TIMEOUT);
+            assertNotNull("Message [" + i +"] was null", message);
+            assertEquals("Message [" + i +"] has unexpected content", i, 
message.getIntProperty(INDEX));
+        }
+
+        session.commit();
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to