Author: kwall Date: Fri Jan 8 13:27:43 2016 New Revision: 1723725 URL: http://svn.apache.org/viewvc?rev=1723725&view=rev Log: QPID-6951: AMQSession.deregisterConsumer() leaks Memory
Merged from trunk with command: svn merge -c 1720664,1721151,1721198 https://svn.apache.org/repos/asf/qpid/java/trunk Modified: qpid/java/branches/6.0.x/ (props changed) qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQSession.java qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java Propchange: qpid/java/branches/6.0.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Fri Jan 8 13:27:43 2016 @@ -9,5 +9,5 @@ /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547 /qpid/branches/java-network-refactor/qpid/java:805429-821809 /qpid/branches/qpid-2935/qpid/java:1061302-1072333 -/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1719047,1719051,1723064 +/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1719047,1719051,1720664,1721151,1721198,1723064 /qpid/trunk/qpid:796646-796653 Modified: qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1723725&r1=1723724&r2=1723725&view=diff ============================================================================== --- qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/AMQSession.java Fri Jan 8 13:27:43 2016 @@ -27,7 +27,6 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -202,12 +201,6 @@ public abstract class AMQSession<C exten private final Map<Integer,C> _consumers = new ConcurrentHashMap<>(); - /** - * Contains a list of consumers which have been removed but which might still have - * messages to acknowledge, eg in client ack or transacted modes - */ - private CopyOnWriteArrayList<C> _removedConsumers = new CopyOnWriteArrayList<C>(); - /** Provides a count of consumers on destinations, in order to be able to know if a destination has consumers. */ private ConcurrentMap<Destination, AtomicInteger> _destinationConsumerCount = new ConcurrentHashMap<Destination, AtomicInteger>(); @@ -945,7 +938,7 @@ public abstract class AMQSession<C exten private void rejectPending(C consumer) { // Reject messages on pre-receive queue - consumer.rollbackPendingMessages(); + consumer.releasePendingMessages(); // Reject messages on pre-dispatch queue rejectMessagesForConsumerTag(consumer.getConsumerTag()); @@ -2177,13 +2170,6 @@ public abstract class AMQSession<C exten } } } - - // Consumers that are closed in a transaction must be stored - // so that messages they have received can be acknowledged on commit - if (_transacted) - { - _removedConsumers.add(consumer); - } } } @@ -3386,7 +3372,7 @@ public abstract class AMQSession<C exten { if (!consumer.isBrowseOnly()) { - consumer.rollback(); + consumer.releasePendingMessages(); } else { @@ -3396,13 +3382,6 @@ public abstract class AMQSession<C exten } - for (int i = 0; i < _removedConsumers.size(); i++) - { - // Sends acknowledgement to server - _removedConsumers.get(i).rollback(); - _removedConsumers.remove(i); - } - setConnectionStopped(isStopped); } Modified: qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1723725&r1=1723724&r2=1723725&view=diff ============================================================================== --- qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original) +++ qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Fri Jan 8 13:27:43 2016 @@ -654,7 +654,7 @@ public abstract class BasicMessageConsum if(!(isBrowseOnly() || getSession().isClosing())) { - rollback(); + releasePendingMessages(); } } } @@ -887,12 +887,7 @@ public abstract class BasicMessageConsum return _browseOnly; } - public void rollback() - { - rollbackPendingMessages(); - } - - public void rollbackPendingMessages() + void releasePendingMessages() { if (_synchronousQueue.size() > 0) { @@ -942,7 +937,7 @@ public abstract class BasicMessageConsum if (_synchronousQueue.size() != 0) { _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size()); - rollback(); + releasePendingMessages(); } clearReceiveQueue(); Modified: qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1723725&r1=1723724&r2=1723725&view=diff ============================================================================== --- qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original) +++ qpid/java/branches/6.0.x/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Fri Jan 8 13:27:43 2016 @@ -454,7 +454,7 @@ public class BasicMessageConsumer_0_10 e return receiveNoWait(); } - @Override public void rollbackPendingMessages() + @Override void releasePendingMessages() { if (getSynchronousQueue().size() > 0) { Modified: qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java URL: http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java?rev=1723725&r1=1723724&r2=1723725&view=diff ============================================================================== --- qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java (original) +++ qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java Fri Jan 8 13:27:43 2016 @@ -37,6 +37,38 @@ public class MessageConsumerCloseTest e { private volatile Exception _exception; + /** + * JMS Session says "The content of a transaction's input and output units is simply those messages that have + * been produced and consumed within the session's current transaction.". Closing a consumer must not therefore + * prevent previously received messages from being committed. + */ + public void testConsumerCloseAndSessionCommit() throws Exception + { + Connection connection = getConnection(); + connection.start(); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = getTestQueue(); + MessageConsumer consumer1 = session.createConsumer(destination); + sendMessage(session, destination, 2); + + + Message message = consumer1.receive(RECEIVE_TIMEOUT); + assertNotNull("First message is not received", message); + assertEquals("First message unexpected has unexpected property", 0, message.getIntProperty(INDEX)); + consumer1.close(); + + session.commit(); + + MessageConsumer consumer2 = session.createConsumer(destination); + message = consumer2.receive(RECEIVE_TIMEOUT); + assertNotNull("Second message is not received", message); + assertEquals("Second message unexpected has unexpected property", 1, message.getIntProperty(INDEX)); + + message = consumer2.receive(100l); + assertNull("Unexpected third message", message); + } + + public void testConsumerCloseAndSessionRollback() throws Exception { Connection connection = getConnection(); @@ -108,4 +140,39 @@ public class MessageConsumerCloseTest e assertEquals("Message three has unexpected content", 2, msg3.getIntProperty(INDEX)); session.commit(); } + + public void testMessagesReceivedBeforeConsumerCloseAreRedeliveredAfterRollback() throws Exception + { + Connection connection = getConnection(); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + + Destination destination = getTestQueue(); + MessageConsumer consumer = session.createConsumer(destination); + + int messageNumber = 4; + connection.start(); + sendMessage(session, destination, messageNumber); + + for(int i = 0; i < messageNumber/2 ; i++) + { + Message message = consumer.receive(RECEIVE_TIMEOUT); + assertNotNull("Message [" + i +"] was null", message); + assertEquals("Message [" + i +"] has unexpected content", i, message.getIntProperty(INDEX)); + } + + consumer.close(); + + session.rollback(); + + MessageConsumer consumer2 = session.createConsumer(destination); + + for(int i = 0; i < messageNumber ; i++) + { + Message message = consumer2.receive(RECEIVE_TIMEOUT); + assertNotNull("Message [" + i +"] was null", message); + assertEquals("Message [" + i +"] has unexpected content", i, message.getIntProperty(INDEX)); + } + + session.commit(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
