Author: robbie
Date: Wed Apr 11 09:00:54 2012
New Revision: 1324655

URL: http://svn.apache.org/viewvc?rev=1324655&view=rev
Log:
QPID-3911: Fix deadlock on concurrent invocation of MessageConsumer#close() and 
Session#rollback() from consumer MessageListener
    
This patch contains the following changes:
- Add synchronization on AMSession#_messageDeliveryLock into 
MessageConsumer#close() in order to block until message listener in progress 
has completed(as required in JMS javadoc for MessageConsumer#close()).
- Change the session dispatcher to stop messages delivery into consumer local 
message queue if the consumer in the process of closing. This eliminates the 
need to stop the dispatcher on rejecting pending messages for closing consumer.
- Remove the synchronization on the dispatcher lock from 
AMQSession.Dispatcher#rejectPending and code to stop the dispatcher, as we are 
synchronizing on the deliveryLock now and incoming messages are not dispatched 
into closing consumers anymore.
- Add a system test to reproduce the deadlock and verify its resolution.
    
Applied patch from Oleksandr Rudyy <[email protected]>

merged from trunk r1310275

Added:
    
qpid/branches/0.16/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
      - copied unchanged from r1310275, 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
Modified:
    qpid/branches/0.16/qpid/java/   (props changed)
    
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Propchange: qpid/branches/0.16/qpid/java/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java:r1310275

Modified: 
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1324655&r1=1324654&r2=1324655&view=diff
==============================================================================
--- 
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Apr 11 09:00:54 2012
@@ -3206,28 +3206,15 @@ public abstract class AMQSession<C exten
 
         public void rejectPending(C consumer)
         {
-            synchronized (_lock)
-            {
-                boolean stopped = connectionStopped();
+            // Reject messages on pre-receive queue
+            consumer.rollbackPendingMessages();
 
-                if (!stopped)
-                {
-                    setConnectionStopped(true);
-                }
+            // Reject messages on pre-dispatch queue
+            rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, 
false);
 
-                // Reject messages on pre-receive queue
-                consumer.rollbackPendingMessages();
+            // closeConsumer
+            consumer.markClosed();
 
-                // Reject messages on pre-dispatch queue
-                rejectMessagesForConsumerTag(consumer.getConsumerTag(), true, 
false);
-                //Let the dispatcher deal with this when it gets to them.
-
-                // closeConsumer
-                consumer.markClosed();
-
-                setConnectionStopped(stopped);
-
-            }
         }
 
         public void rollback()
@@ -3419,7 +3406,7 @@ public abstract class AMQSession<C exten
         {
             final C consumer = _consumers.get(message.getConsumerTag());
 
-            if ((consumer == null) || consumer.isClosed())
+            if ((consumer == null) || consumer.isClosed() || 
consumer.isClosing())
             {
                 if (_dispatcherLogger.isInfoEnabled())
                 {

Modified: 
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1324655&r1=1324654&r2=1324655&view=diff
==============================================================================
--- 
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Wed Apr 11 09:00:54 2012
@@ -590,7 +590,10 @@ public abstract class BasicMessageConsum
                         // no point otherwise as the connection will be gone
                         if (!_session.isClosed() || _session.isClosing())
                         {
-                            sendCancel();
+                            synchronized(_session.getMessageDeliveryLock())
+                            {
+                                sendCancel();
+                            }
                             cleanupQueue();
                         }
                     }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to