Author: chirino
Date: Thu Mar 20 08:31:34 2008
New Revision: 639315
URL: http://svn.apache.org/viewvc?rev=639315&view=rev
Log:
Fix queue reference counting.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=639315&r1=639314&r2=639315&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Mar 20 08:31:34 2008
@@ -458,11 +458,10 @@
if (node == null) {
break;
}
- if(isDropped(node)) {
- pending.remove();
- }
- else if (canDispatch(node)) {
- pending.remove();
+
+ pending.remove();
+ if( !isDropped(node) && canDispatch(node)) {
+
// Message may have been sitting in the pending
// list a while waiting for the consumer to ak
the message.
if (node != QueueMessageReference.NULL_MESSAGE
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=639315&r1=639314&r2=639315&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Mar 20 08:31:34 2008
@@ -1002,7 +1002,6 @@
reference.drop();
acknowledge(context, sub, ack, reference);
destinationStatistics.getMessages().decrement();
- reference.decrementReferenceCount();
synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId());
}
@@ -1056,6 +1055,7 @@
messages.reset();
while (messages.hasNext() && count < toPageIn) {
MessageReference node = messages.next();
+ node.incrementReferenceCount();
messages.remove();
if (!broker.isExpired(node)) {
QueueMessageReference ref =
createMessageReference(node.getMessage());
@@ -1097,7 +1097,6 @@
if (dispatchSelector.canSelect(s, node)) {
if (!s.isFull()) {
s.add(node);
- node.incrementReferenceCount();
target = s;
break;
} else {