Author: robbie
Date: Wed Apr 11 09:00:54 2012
New Revision: 1324655
URL: http://svn.apache.org/viewvc?rev=1324655&view=rev
Log:
QPID-3911: Fix deadlock on concurrent invocation of MessageConsumer#close() and
Session#rollback() from consumer MessageListener
This patch contains the following changes:
- Add synchronization on AMSession#_messageDeliveryLock into
MessageConsumer#close() in order to block until message listener in progress
has completed(as required in JMS javadoc for MessageConsumer#close()).
- Change the session dispatcher to stop messages delivery into consumer local
message queue if the consumer in the process of closing. This eliminates the
need to stop the dispatcher on rejecting pending messages for closing consumer.
- Remove the synchronization on the dispatcher lock from
AMQSession.Dispatcher#rejectPending and code to stop the dispatcher, as we are
synchronizing on the deliveryLock now and incoming messages are not dispatched
into closing consumers anymore.
- Add a system test to reproduce the deadlock and verify its resolution.
Applied patch from Oleksandr Rudyy <[email protected]>
merged from trunk r1310275
Added:
qpid/branches/0.16/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
- copied unchanged from r1310275,
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageConsumerCloseTest.java
Modified:
qpid/branches/0.16/qpid/java/ (props changed)
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Propchange: qpid/branches/0.16/qpid/java/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java:r1310275
Modified:
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1324655&r1=1324654&r2=1324655&view=diff
==============================================================================
---
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Wed Apr 11 09:00:54 2012
@@ -3206,28 +3206,15 @@ public abstract class AMQSession<C exten
public void rejectPending(C consumer)
{
- synchronized (_lock)
- {
- boolean stopped = connectionStopped();
+ // Reject messages on pre-receive queue
+ consumer.rollbackPendingMessages();
- if (!stopped)
- {
- setConnectionStopped(true);
- }
+ // Reject messages on pre-dispatch queue
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true,
false);
- // Reject messages on pre-receive queue
- consumer.rollbackPendingMessages();
+ // closeConsumer
+ consumer.markClosed();
- // Reject messages on pre-dispatch queue
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true,
false);
- //Let the dispatcher deal with this when it gets to them.
-
- // closeConsumer
- consumer.markClosed();
-
- setConnectionStopped(stopped);
-
- }
}
public void rollback()
@@ -3419,7 +3406,7 @@ public abstract class AMQSession<C exten
{
final C consumer = _consumers.get(message.getConsumerTag());
- if ((consumer == null) || consumer.isClosed())
+ if ((consumer == null) || consumer.isClosed() ||
consumer.isClosing())
{
if (_dispatcherLogger.isInfoEnabled())
{
Modified:
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1324655&r1=1324654&r2=1324655&view=diff
==============================================================================
---
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
qpid/branches/0.16/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Wed Apr 11 09:00:54 2012
@@ -590,7 +590,10 @@ public abstract class BasicMessageConsum
// no point otherwise as the connection will be gone
if (!_session.isClosed() || _session.isClosing())
{
- sendCancel();
+ synchronized(_session.getMessageDeliveryLock())
+ {
+ sendCancel();
+ }
cleanupQueue();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]