Author: rajdavies
Date: Tue May 27 08:20:41 2008
New Revision: 660555
URL: http://svn.apache.org/viewvc?rev=660555&view=rev
Log:
Apply patch for https://issues.apache.org/activemq/browse/AMQ-1748
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=660555&r1=660554&r2=660555&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue May 27 08:20:41 2008
@@ -1062,9 +1062,7 @@
}
final void sendMessage(final ConnectionContext context, Message msg)
throws Exception {
- synchronized (messages) {
- messages.addMessageLast(msg);
- }
+ messages.addMessageLast(msg);
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
messageDelivered(context, msg);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java?rev=660555&r1=660554&r2=660555&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java
Tue May 27 08:20:41 2008
@@ -21,6 +21,7 @@
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@@ -59,6 +60,7 @@
private boolean flushRequired;
private AtomicBoolean started = new AtomicBoolean();
private MessageReference last = null;
+ private ReentrantLock lock = new ReentrantLock(true);
/**
* @param name
@@ -93,20 +95,25 @@
/**
* @return true if there are no pending messages
*/
- public synchronized boolean isEmpty() {
- if(memoryList.isEmpty() && isDiskListEmpty()){
- return true;
- }
- for (Iterator<MessageReference> iterator = memoryList.iterator();
iterator.hasNext();) {
- MessageReference node = iterator.next();
- if (node== QueueMessageReference.NULL_MESSAGE){
- continue;
- }
- if (!node.isDropped()) {
- return false;
+ public boolean isEmpty() {
+ lock.lock();
+ try {
+ if(memoryList.isEmpty() && isDiskListEmpty()){
+ return true;
+ }
+ for (Iterator<MessageReference> iterator = memoryList.iterator();
iterator.hasNext();) {
+ MessageReference node = iterator.next();
+ if (node== QueueMessageReference.NULL_MESSAGE){
+ continue;
+ }
+ if (!node.isDropped()) {
+ return false;
+ }
+ // We can remove dropped references.
+ iterator.remove();
}
- // We can remove dropped references.
- iterator.remove();
+ } finally {
+ lock.unlock();
}
return isDiskListEmpty();
}
@@ -116,48 +123,71 @@
/**
* reset the cursor
*/
- public synchronized void reset() {
- iterating = true;
- last = null;
- iter = isDiskListEmpty() ? memoryList.iterator() :
getDiskList().listIterator();
+ public void reset() {
+ lock.lock();
+ try {
+ iterating = true;
+ last = null;
+ iter = isDiskListEmpty() ? memoryList.iterator() :
getDiskList().listIterator();
+ } finally {
+ lock.unlock();
+ }
}
- public synchronized void release() {
- iterating = false;
- if (flushRequired) {
- flushRequired = false;
- flushToDisk();
+ public void release() {
+ lock.lock();
+ try {
+ synchronized(this) {
+ iterating = false;
+ this.notifyAll();
+ }
+ if (flushRequired) {
+ flushRequired = false;
+ flushToDisk();
+ }
+ } finally {
+ lock.unlock();
}
}
- public synchronized void destroy() throws Exception {
- stop();
- for (Iterator<MessageReference> i = memoryList.iterator();
i.hasNext();) {
- Message node = (Message)i.next();
- node.decrementReferenceCount();
- }
- memoryList.clear();
- if (!isDiskListEmpty()) {
- getDiskList().clear();
+ public void destroy() throws Exception {
+ lock.lock();
+ try {
+ stop();
+ for (Iterator<MessageReference> i = memoryList.iterator();
i.hasNext();) {
+ Message node = (Message)i.next();
+ node.decrementReferenceCount();
+ }
+ memoryList.clear();
+ if (!isDiskListEmpty()) {
+ getDiskList().clear();
+ }
+ } finally {
+ lock.unlock();
}
}
- public synchronized LinkedList<MessageReference> pageInList(int maxItems) {
- LinkedList<MessageReference> result = new
LinkedList<MessageReference>();
+ public LinkedList<MessageReference> pageInList(int maxItems) {
int count = 0;
- for (Iterator<MessageReference> i = memoryList.iterator(); i.hasNext()
&& count < maxItems;) {
- result.add(i.next());
- count++;
- }
- if (count < maxItems && !isDiskListEmpty()) {
- for (Iterator<MessageReference> i = getDiskList().iterator();
i.hasNext() && count < maxItems;) {
- Message message = (Message)i.next();
- message.setRegionDestination(regionDestination);
- message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
- message.incrementReferenceCount();
- result.add(message);
+ LinkedList<MessageReference> result = new
LinkedList<MessageReference>();
+ lock.lock();
+ try {
+ for (Iterator<MessageReference> i = memoryList.iterator();
i.hasNext() && count < maxItems;) {
+ result.add(i.next());
count++;
}
+ if (count < maxItems && !isDiskListEmpty()) {
+ for (Iterator<MessageReference> i = getDiskList().iterator();
i.hasNext() && count < maxItems;) {
+ Message message = (Message)i.next();
+ message.setRegionDestination(regionDestination);
+
message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+ message.incrementReferenceCount();
+ result.add(message);
+ count++;
+ }
+ }
+ } finally {
+ lock.unlock();
}
return result;
}
@@ -167,35 +197,52 @@
*
* @param node
*/
- public synchronized void addMessageLast(MessageReference node) {
+ public void addMessageLast(MessageReference node) {
if (!node.isExpired()) {
try {
- regionDestination = node.getMessage().getRegionDestination();
- if (isDiskListEmpty()) {
- if (hasSpace() || this.store==null) {
- memoryList.add(node);
- node.incrementReferenceCount();
- return;
+ lock.lock();
+ try {
+ while (iterating) {
+ lock.unlock();
+ synchronized(this) {
+ try {
+ this.wait();
+ } catch (InterruptedException ie) {}
+ }
+ lock.lock();
}
- }
- if (!hasSpace()) {
+ regionDestination =
node.getMessage().getRegionDestination();
if (isDiskListEmpty()) {
- expireOldMessages();
- if (hasSpace()) {
+ if (hasSpace() || this.store==null) {
memoryList.add(node);
node.incrementReferenceCount();
return;
- } else {
- flushToDisk();
}
}
+ if (!hasSpace()) {
+ if (isDiskListEmpty()) {
+ expireOldMessages();
+ if (hasSpace()) {
+ memoryList.add(node);
+ node.incrementReferenceCount();
+ return;
+ } else {
+ flushToDisk();
+ }
+ }
+ }
+ if (systemUsage.getTempUsage().isFull()) {
+ lock.unlock();
+ systemUsage.getTempUsage().waitForSpace();
+ lock.lock();
+ }
+ getDiskList().add(node);
+ } finally {
+ lock.unlock();
}
- systemUsage.getTempUsage().waitForSpace();
- getDiskList().add(node);
-
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node
- + " first to FilePendingMessageCursor ", e);
+ + " last to FilePendingMessageCursor ", e);
throw new RuntimeException(e);
}
} else {
@@ -208,32 +255,50 @@
*
* @param node
*/
- public synchronized void addMessageFirst(MessageReference node) {
+ public void addMessageFirst(MessageReference node) {
if (!node.isExpired()) {
try {
- regionDestination = node.getMessage().getRegionDestination();
- if (isDiskListEmpty()) {
- if (hasSpace()) {
- memoryList.addFirst(node);
- node.incrementReferenceCount();
- return;
+ lock.lock();
+ try {
+ while (iterating) {
+ lock.unlock();
+ synchronized(this) {
+ try {
+ this.wait();
+ } catch (InterruptedException ie) {}
+ }
+ lock.lock();
}
- }
- if (!hasSpace()) {
+ regionDestination =
node.getMessage().getRegionDestination();
if (isDiskListEmpty()) {
- expireOldMessages();
if (hasSpace()) {
memoryList.addFirst(node);
node.incrementReferenceCount();
return;
- } else {
- flushToDisk();
}
}
+ if (!hasSpace()) {
+ if (isDiskListEmpty()) {
+ expireOldMessages();
+ if (hasSpace()) {
+ memoryList.addFirst(node);
+ node.incrementReferenceCount();
+ return;
+ } else {
+ flushToDisk();
+ }
+ }
+ }
+ if (systemUsage.getTempUsage().isFull()) {
+ lock.unlock();
+ systemUsage.getTempUsage().waitForSpace();
+ lock.lock();
+ }
+ node.decrementReferenceCount();
+ getDiskList().addFirst(node);
+ } finally {
+ lock.unlock();
}
- systemUsage.getTempUsage().waitForSpace();
- node.decrementReferenceCount();
- getDiskList().addFirst(node);
} catch (Exception e) {
LOG.error("Caught an Exception adding a message: " + node
@@ -244,25 +309,38 @@
discard(node);
}
}
-
+
/**
* @return true if there pending messages to dispatch
*/
- public synchronized boolean hasNext() {
- return iter.hasNext();
+ public boolean hasNext() {
+ boolean result;
+ lock.lock();
+ try {
+ result = iter.hasNext();
+ } finally {
+ lock.unlock();
+ }
+ return result;
}
/**
* @return the next pending message
*/
- public synchronized MessageReference next() {
- Message message = (Message)iter.next();
- last = message;
- if (!isDiskListEmpty()) {
- // got from disk
- message.setRegionDestination(regionDestination);
- message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
- message.incrementReferenceCount();
+ public MessageReference next() {
+ Message message;
+ lock.lock();
+ try {
+ message = (Message)iter.next();
+ last = message;
+ if (!isDiskListEmpty()) {
+ // got from disk
+ message.setRegionDestination(regionDestination);
+ message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
+ message.incrementReferenceCount();
+ }
+ } finally {
+ lock.unlock();
}
return message;
}
@@ -270,10 +348,15 @@
/**
* remove the message at the cursor position
*/
- public synchronized void remove() {
- iter.remove();
- if (last != null) {
- last.decrementReferenceCount();
+ public void remove() {
+ lock.lock();
+ try {
+ iter.remove();
+ if (last != null) {
+ last.decrementReferenceCount();
+ }
+ } finally {
+ lock.unlock();
}
}
@@ -281,36 +364,61 @@
* @param node
* @see
org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursor#remove(org.apache.activemq.broker.region.MessageReference)
*/
- public synchronized void remove(MessageReference node) {
- if (memoryList.remove(node)) {
- node.decrementReferenceCount();
- }
- if (!isDiskListEmpty()) {
- getDiskList().remove(node);
+ public void remove(MessageReference node) {
+ lock.lock();
+ try {
+ if (memoryList.remove(node)) {
+ node.decrementReferenceCount();
+ }
+ if (!isDiskListEmpty()) {
+ getDiskList().remove(node);
+ }
+ } finally {
+ lock.unlock();
}
}
/**
* @return the number of pending messages
*/
- public synchronized int size() {
- return memoryList.size() + (isDiskListEmpty() ? 0 :
getDiskList().size());
+ public int size() {
+ int result;
+ lock.lock();
+ try {
+ result = memoryList.size() + (isDiskListEmpty() ? 0 :
getDiskList().size());
+ } finally {
+ lock.unlock();
+ }
+ return result;
}
/**
* clear all pending messages
*/
- public synchronized void clear() {
- memoryList.clear();
- if (!isDiskListEmpty()) {
- getDiskList().clear();
+ public void clear() {
+ lock.lock();
+ try {
+ memoryList.clear();
+ if (!isDiskListEmpty()) {
+ getDiskList().clear();
+ }
+ last=null;
+ } finally {
+ lock.unlock();
}
- last=null;
}
- public synchronized boolean isFull() {
- // we always have space - as we can persist to disk
- return false;
+ public boolean isFull() {
+ boolean result;
+ lock.lock();
+ try {
+ // we always have space - as we can persist to disk
+ // TODO: not necessarily true.
+ result = false;
+ } finally {
+ lock.unlock();
+ }
+ return result;
}
public boolean hasMessagesBufferedToDeliver() {
@@ -324,7 +432,8 @@
public void onUsageChanged(Usage usage, int oldPercentUsage,
int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
- synchronized (this) {
+ lock.lock();
+ try {
flushRequired = true;
if (!iterating) {
expireOldMessages();
@@ -333,6 +442,8 @@
flushRequired = false;
}
}
+ } finally {
+ lock.unlock();
}
}
}
@@ -345,31 +456,39 @@
return hasSpace() && isDiskListEmpty();
}
- protected synchronized void expireOldMessages() {
- if (!memoryList.isEmpty()) {
- LinkedList<MessageReference> tmpList = new
LinkedList<MessageReference>(this.memoryList);
- this.memoryList = new LinkedList<MessageReference>();
- while (!tmpList.isEmpty()) {
- MessageReference node = tmpList.removeFirst();
- if (node.isExpired()) {
- discard(node);
- }else {
- memoryList.add(node);
- }
+ protected void expireOldMessages() {
+ lock.lock();
+ try {
+ if (!memoryList.isEmpty()) {
+ LinkedList<MessageReference> tmpList = new
LinkedList<MessageReference>(this.memoryList);
+ this.memoryList = new LinkedList<MessageReference>();
+ while (!tmpList.isEmpty()) {
+ MessageReference node = tmpList.removeFirst();
+ if (node.isExpired()) {
+ discard(node);
+ }else {
+ memoryList.add(node);
+ }
+ }
}
+ } finally {
+ lock.unlock();
}
-
}
- protected synchronized void flushToDisk() {
-
- if (!memoryList.isEmpty()) {
- while (!memoryList.isEmpty()) {
- MessageReference node = memoryList.removeFirst();
- node.decrementReferenceCount();
- getDiskList().addLast(node);
+ protected void flushToDisk() {
+ lock.lock();
+ try {
+ if (!memoryList.isEmpty()) {
+ while (!memoryList.isEmpty()) {
+ MessageReference node = memoryList.removeFirst();
+ node.decrementReferenceCount();
+ getDiskList().addLast(node);
+ }
+ memoryList.clear();
}
- memoryList.clear();
+ } finally {
+ lock.unlock();
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java?rev=660555&r1=660554&r2=660555&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreQueueCursor.java
Tue May 27 08:20:41 2008
@@ -16,12 +16,10 @@
*/
package org.apache.activemq.broker.region.cursors;
-import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
-import org.apache.activemq.kaha.Store;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -89,7 +87,7 @@
pendingCount = 0;
}
- public synchronized void addMessageLast(MessageReference node) throws
Exception {
+ public void addMessageLast(MessageReference node) throws Exception {
if (node != null) {
Message msg = node.getMessage();
if (started) {
@@ -104,7 +102,7 @@
}
}
- public synchronized void addMessageFirst(MessageReference node) throws
Exception {
+ public void addMessageFirst(MessageReference node) throws Exception {
if (node != null) {
Message msg = node.getMessage();
if (started) {
@@ -142,6 +140,11 @@
MessageReference result = currentCursor != null ? currentCursor.next()
: null;
return result;
}
+
+ public synchronized void release() {
+ nonPersistent.release();
+ persistent.release();
+ }
public synchronized void remove() {
if (currentCursor != null) {
@@ -159,7 +162,7 @@
pendingCount--;
}
- public synchronized void reset() {
+ public void reset() {
nonPersistent.reset();
persistent.reset();
}