Author: kwall
Date: Tue Dec 16 08:55:22 2014
New Revision: 1645880
URL: http://svn.apache.org/r1645880
Log:
QPID-6272: [Java Broker] Null reference to AMQChannel#defaultQueue once the
queue is deleted
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1645880&r1=1645879&r2=1645880&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
(original)
+++
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
Tue Dec 16 08:55:22 2014
@@ -124,6 +124,8 @@ public class AMQChannel
public static final int DEFAULT_PREFETCH = 4096;
private static final Logger _logger = Logger.getLogger(AMQChannel.class);
+ private final DefaultQueueAssociationClearingTask
+ _defaultQueueAssociationClearingTask = new
DefaultQueueAssociationClearingTask();
//TODO use Broker property to configure message authorization requirements
private boolean _messageAuthorizationRequired =
Boolean.getBoolean(BrokerProperties.PROPERTY_MSG_AUTH);
@@ -140,7 +142,7 @@ public class AMQChannel
private long _deliveryTag = 0;
/** A channel has a default queue (the last declared) that is used when no
queue name is explicitly set */
- private AMQQueue _defaultQueue;
+ private volatile AMQQueue _defaultQueue;
/** This tag is unique per subscription to a queue. The server returns
this in response to a basic.consume request. */
private int _consumerTag;
@@ -181,11 +183,9 @@ public class AMQChannel
private LogSubject _logSubject;
private volatile boolean _rollingBack;
- private static final Runnable NULL_TASK = new Runnable() { public void
run() {} };
private List<MessageInstance> _resendList = new
ArrayList<MessageInstance>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new
AMQShortString("Immediate delivery is not possible.");
- private long _createTime = System.currentTimeMillis();
private final ClientDeliveryMethod _clientDeliveryMethod;
@@ -1289,17 +1289,6 @@ public class AMQChannel
return "("+ _suspended.get() + ", " + _closing.get() + ", " +
_connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]";
}
- public void setDefaultQueue(AMQQueue queue)
- {
- _defaultQueue = queue;
- }
-
- public AMQQueue getDefaultQueue()
- {
- return _defaultQueue;
- }
-
-
public boolean isClosing()
{
return _closing.get();
@@ -3585,4 +3574,37 @@ public class AMQChannel
return exchangeName == null ||
AMQShortString.EMPTY_STRING.equals(exchangeName);
}
+ private void setDefaultQueue(AMQQueue queue)
+ {
+ AMQQueue currentDefaultQueue = _defaultQueue;
+ if (queue != currentDefaultQueue)
+ {
+ if (currentDefaultQueue != null)
+ {
+
currentDefaultQueue.removeDeleteTask(_defaultQueueAssociationClearingTask);
+ }
+ if (queue != null)
+ {
+ queue.addDeleteTask(_defaultQueueAssociationClearingTask);
+ }
+ }
+ _defaultQueue = queue;
+ }
+
+ private AMQQueue getDefaultQueue()
+ {
+ return _defaultQueue;
+ }
+
+ private class DefaultQueueAssociationClearingTask implements
Action<AMQQueue>
+ {
+ @Override
+ public void performAction(final AMQQueue queue)
+ {
+ if ( queue == _defaultQueue)
+ {
+ _defaultQueue = null;
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]