Author: rajdavies
Date: Mon Sep 24 10:00:08 2007
New Revision: 578877
URL: http://svn.apache.org/viewvc?rev=578877&view=rev
Log:
fix reference counting
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=578877&r1=578876&r2=578877&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Mon Sep 24 10:00:08 2007
@@ -28,11 +28,9 @@
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
-import org.apache.activemq.usage.SystemUsage;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* persist pending messages pending message (messages awaiting dispatch to a
@@ -42,7 +40,6 @@
*/
public class FilePendingMessageCursor extends AbstractPendingMessageCursor
implements UsageListener {
- private static final Log LOG =
LogFactory.getLog(FilePendingMessageCursor.class);
private static final AtomicLong NAME_COUNT = new AtomicLong();
private Store store;
@@ -54,6 +51,7 @@
private boolean iterating;
private boolean flushRequired;
private AtomicBoolean started = new AtomicBoolean();
+ private MessageReference last = null;
/**
* @param name
@@ -85,7 +83,8 @@
* @return true if there are no pending messages
*/
public synchronized boolean isEmpty() {
- return memoryList.isEmpty() && isDiskListEmpty();
+ boolean result = memoryList.isEmpty() && isDiskListEmpty();
+ return result;
}
/**
@@ -93,6 +92,7 @@
*/
public synchronized void reset() {
iterating = true;
+ last = null;
iter = isDiskListEmpty() ? memoryList.iterator() :
getDiskList().listIterator();
}
@@ -145,6 +145,7 @@
regionDestination = node.getMessage().getRegionDestination();
if (isSpaceInMemoryList()) {
memoryList.add(node);
+ node.incrementReferenceCount();
} else {
flushToDisk();
node.decrementReferenceCount();
@@ -166,6 +167,7 @@
regionDestination = node.getMessage().getRegionDestination();
if (isSpaceInMemoryList()) {
memoryList.addFirst(node);
+ node.incrementReferenceCount();
} else {
flushToDisk();
systemUsage.getTempUsage().waitForSpace();
@@ -189,6 +191,7 @@
*/
public synchronized MessageReference next() {
Message message = (Message)iter.next();
+ last = message;
if (!isDiskListEmpty()) {
// got from disk
message.setRegionDestination(regionDestination);
@@ -202,6 +205,9 @@
*/
public synchronized void remove() {
iter.remove();
+ if (last != null) {
+ last.decrementReferenceCount();
+ }
}
/**
@@ -209,7 +215,9 @@
* @see
org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
*/
public synchronized void remove(MessageReference node) {
- memoryList.remove(node);
+ if (memoryList.remove(node)) {
+ node.decrementReferenceCount();
+ }
if (!isDiskListEmpty()) {
getDiskList().remove(node);
}
@@ -230,6 +238,7 @@
if (!isDiskListEmpty()) {
getDiskList().clear();
}
+ last=null;
}
public synchronized boolean isFull() {