Author: kwall
Date: Fri Jan  8 13:27:43 2016
New Revision: 1723725

URL: http://svn.apache.org/viewvc?rev=1723725&view=rev
Log:
QPID-6951: AMQSession.deregisterConsumer() leaks Memory

Merged from trunk with command:

svn merge -c 1720664,1721151,1721198  
https://svn.apache.org/repos/asf/qpid/java/trunk


Modified:
    qpid/java/branches/6.0.x/   (props changed)
    
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java

Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan  8 13:27:43 2016
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1719047,1719051,1723064
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1719047,1719051,1720664,1721151,1721198,1723064
 /qpid/trunk/qpid:796646-796653

Modified: 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1723725&r1=1723724&r2=1723725&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Fri Jan  8 13:27:43 2016
@@ -27,7 +27,6 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -202,12 +201,6 @@ public abstract class AMQSession<C exten
 
     private final Map<Integer,C> _consumers = new ConcurrentHashMap<>();
 
-    /**
-     * Contains a list of consumers which have been removed but which might 
still have
-     * messages to acknowledge, eg in client ack or transacted modes
-     */
-    private CopyOnWriteArrayList<C> _removedConsumers = new 
CopyOnWriteArrayList<C>();
-
     /** Provides a count of consumers on destinations, in order to be able to 
know if a destination has consumers. */
     private ConcurrentMap<Destination, AtomicInteger> 
_destinationConsumerCount =
             new ConcurrentHashMap<Destination, AtomicInteger>();
@@ -945,7 +938,7 @@ public abstract class AMQSession<C exten
     private void rejectPending(C consumer)
     {
         // Reject messages on pre-receive queue
-        consumer.rollbackPendingMessages();
+        consumer.releasePendingMessages();
 
         // Reject messages on pre-dispatch queue
         rejectMessagesForConsumerTag(consumer.getConsumerTag());
@@ -2177,13 +2170,6 @@ public abstract class AMQSession<C exten
                     }
                 }
             }
-
-            // Consumers that are closed in a transaction must be stored
-            // so that messages they have received can be acknowledged on 
commit
-            if (_transacted)
-            {
-                _removedConsumers.add(consumer);
-            }
         }
     }
 
@@ -3386,7 +3372,7 @@ public abstract class AMQSession<C exten
                 {
                     if (!consumer.isBrowseOnly())
                     {
-                        consumer.rollback();
+                        consumer.releasePendingMessages();
                     }
                     else
                     {
@@ -3396,13 +3382,6 @@ public abstract class AMQSession<C exten
 
                 }
 
-                for (int i = 0; i < _removedConsumers.size(); i++)
-                {
-                    // Sends acknowledgement to server
-                    _removedConsumers.get(i).rollback();
-                    _removedConsumers.remove(i);
-                }
-
                 setConnectionStopped(isStopped);
             }
 

Modified: 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1723725&r1=1723724&r2=1723725&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Fri Jan  8 13:27:43 2016
@@ -654,7 +654,7 @@ public abstract class BasicMessageConsum
 
             if(!(isBrowseOnly() || getSession().isClosing()))
             {
-                rollback();
+                releasePendingMessages();
             }
         }
     }
@@ -887,12 +887,7 @@ public abstract class BasicMessageConsum
         return _browseOnly;
     }
 
-    public void rollback()
-    {
-        rollbackPendingMessages();
-    }
-
-    public void rollbackPendingMessages()
+    void releasePendingMessages()
     {
         if (_synchronousQueue.size() > 0)
         {
@@ -942,7 +937,7 @@ public abstract class BasicMessageConsum
             if (_synchronousQueue.size() != 0)
             {
                 _logger.warn("Queue was not empty after rejecting all messages 
Remaining:" + _synchronousQueue.size());
-                rollback();
+                releasePendingMessages();
             }
 
             clearReceiveQueue();

Modified: 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1723725&r1=1723724&r2=1723725&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Fri Jan  8 13:27:43 2016
@@ -454,7 +454,7 @@ public class BasicMessageConsumer_0_10 e
         return receiveNoWait();
     }
 
-    @Override public void rollbackPendingMessages()
+    @Override void releasePendingMessages()
     {
         if (getSynchronousQueue().size() > 0)
         {

Modified: 
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java?rev=1723725&r1=1723724&r2=1723725&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
 (original)
+++ 
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
 Fri Jan  8 13:27:43 2016
@@ -37,6 +37,38 @@ public class MessageConsumerCloseTest  e
 {
     private volatile Exception _exception;
 
+    /**
+     * JMS Session says "The content of a transaction's input and output units 
is simply those messages that have
+     * been produced and consumed within the session's current transaction.".  
Closing a consumer must not therefore
+     * prevent previously received messages from being committed.
+     */
+    public void testConsumerCloseAndSessionCommit() throws Exception
+    {
+        Connection connection = getConnection();
+        connection.start();
+        final Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        Destination destination = getTestQueue();
+        MessageConsumer consumer1 = session.createConsumer(destination);
+        sendMessage(session, destination, 2);
+
+
+        Message message = consumer1.receive(RECEIVE_TIMEOUT);
+        assertNotNull("First message is not received", message);
+        assertEquals("First message unexpected has unexpected property", 0, 
message.getIntProperty(INDEX));
+        consumer1.close();
+
+        session.commit();
+
+        MessageConsumer consumer2 = session.createConsumer(destination);
+        message = consumer2.receive(RECEIVE_TIMEOUT);
+        assertNotNull("Second message is not received", message);
+        assertEquals("Second message unexpected has unexpected property", 1, 
message.getIntProperty(INDEX));
+
+        message = consumer2.receive(100l);
+        assertNull("Unexpected third message", message);
+    }
+
+
     public void testConsumerCloseAndSessionRollback() throws Exception
     {
         Connection connection = getConnection();
@@ -108,4 +140,39 @@ public class MessageConsumerCloseTest  e
         assertEquals("Message three has unexpected content", 2, 
msg3.getIntProperty(INDEX));
         session.commit();
     }
+
+    public void 
testMessagesReceivedBeforeConsumerCloseAreRedeliveredAfterRollback() throws 
Exception
+    {
+        Connection connection = getConnection();
+        final Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+
+        Destination destination = getTestQueue();
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        int messageNumber = 4;
+        connection.start();
+        sendMessage(session, destination, messageNumber);
+
+        for(int i = 0; i < messageNumber/2 ; i++)
+        {
+            Message message = consumer.receive(RECEIVE_TIMEOUT);
+            assertNotNull("Message [" + i +"] was null", message);
+            assertEquals("Message [" + i +"] has unexpected content", i, 
message.getIntProperty(INDEX));
+        }
+
+        consumer.close();
+
+        session.rollback();
+
+        MessageConsumer consumer2 = session.createConsumer(destination);
+
+        for(int i = 0; i < messageNumber ; i++)
+        {
+            Message message = consumer2.receive(RECEIVE_TIMEOUT);
+            assertNotNull("Message [" + i +"] was null", message);
+            assertEquals("Message [" + i +"] has unexpected content", i, 
message.getIntProperty(INDEX));
+        }
+
+        session.commit();
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to