Author: rajdavies
Date: Mon Mar 17 06:28:06 2008
New Revision: 637878
URL: http://svn.apache.org/viewvc?rev=637878&view=rev
Log:
tightened synchronization around dispatchQueue
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=637878&r1=637877&r2=637878&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Mon Mar 17 06:28:06 2008
@@ -112,7 +112,7 @@
protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
// The broker and wireformat info that was exchanged.
protected BrokerInfo brokerInfo;
- protected final List<Command> dispatchQueue =
Collections.synchronizedList(new LinkedList<Command>());
+ protected final List<Command> dispatchQueue = new LinkedList<Command>();
protected TaskRunner taskRunner;
protected final AtomicReference<IOException> transportException = new
AtomicReference<IOException>();
protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
@@ -205,7 +205,9 @@
* @return size of dispatch queue
*/
public int getDispatchQueueSize() {
- return dispatchQueue.size();
+ synchronized(dispatchQueue) {
+ return dispatchQueue.size();
+ }
}
public void serviceTransportException(IOException e) {
@@ -743,7 +745,9 @@
if (taskRunner == null) {
dispatchSync(message);
} else {
- dispatchQueue.add(message);
+ synchronized(dispatchQueue) {
+ dispatchQueue.add(message);
+ }
try {
taskRunner.wakeup();
} catch (InterruptedException e) {
@@ -780,7 +784,7 @@
sub.run();
}
}
- getStatistics().getDequeues().increment();
+ //getStatistics().getDequeues().increment();
}
}
@@ -800,11 +804,13 @@
}
if (!dispatchStopped.get()) {
-
- if (dispatchQueue.isEmpty()) {
- return false;
+ Command command = null;
+ synchronized(dispatchQueue) {
+ if (dispatchQueue.isEmpty()) {
+ return false;
+ }
+ command = dispatchQueue.remove(0);
}
- Command command = dispatchQueue.remove(0);
processDispatch(command);
return true;
}
@@ -968,16 +974,19 @@
// Run the MessageDispatch callbacks so that message references get
// cleaned up.
- for (Iterator<Command> iter = dispatchQueue.iterator();
iter.hasNext();) {
- Command command = iter.next();
- if (command.isMessageDispatch()) {
- MessageDispatch md = (MessageDispatch)command;
- Runnable sub = md.getTransmitCallback();
- broker.postProcessDispatch(md);
- if (sub != null) {
- sub.run();
+ synchronized(dispatchQueue) {
+ for (Iterator<Command> iter = dispatchQueue.iterator();
iter.hasNext();) {
+ Command command = iter.next();
+ if (command.isMessageDispatch()) {
+ MessageDispatch md = (MessageDispatch)command;
+ Runnable sub = md.getTransmitCallback();
+ broker.postProcessDispatch(md);
+ if (sub != null) {
+ sub.run();
+ }
}
}
+ dispatchQueue.clear();
}
//
// Remove all logical connection associated with this connection