Author: rajdavies
Date: Mon Mar 17 06:28:06 2008
New Revision: 637878

URL: http://svn.apache.org/viewvc?rev=637878&view=rev
Log:
tightened synchronization around dispatchQueue

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=637878&r1=637877&r2=637878&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 Mon Mar 17 06:28:06 2008
@@ -112,7 +112,7 @@
     protected final Map<ConnectionId, ConnectionState> brokerConnectionStates;
     // The broker and wireformat info that was exchanged.
     protected BrokerInfo brokerInfo;
-    protected final List<Command> dispatchQueue = 
Collections.synchronizedList(new LinkedList<Command>());
+    protected final List<Command> dispatchQueue = new LinkedList<Command>();
     protected TaskRunner taskRunner;
     protected final AtomicReference<IOException> transportException = new 
AtomicReference<IOException>();
     protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
@@ -205,7 +205,9 @@
      * @return size of dispatch queue
      */
     public int getDispatchQueueSize() {
-        return dispatchQueue.size();
+        synchronized(dispatchQueue) {
+            return dispatchQueue.size();
+        }
     }
 
     public void serviceTransportException(IOException e) {
@@ -743,7 +745,9 @@
             if (taskRunner == null) {
                 dispatchSync(message);
             } else {
-                dispatchQueue.add(message);
+                synchronized(dispatchQueue) {
+                    dispatchQueue.add(message);
+                }
                 try {
                     taskRunner.wakeup();
                 } catch (InterruptedException e) {
@@ -780,7 +784,7 @@
                     sub.run();
                 }
             }
-            getStatistics().getDequeues().increment();
+            //getStatistics().getDequeues().increment();
         }
     }
 
@@ -800,11 +804,13 @@
             }
 
             if (!dispatchStopped.get()) {
-
-                if (dispatchQueue.isEmpty()) {
-                    return false;
+                Command command = null;
+                synchronized(dispatchQueue) {
+                    if (dispatchQueue.isEmpty()) {
+                        return false;
+                    }
+                    command = dispatchQueue.remove(0);
                 }
-                Command command = dispatchQueue.remove(0);
                 processDispatch(command);
                 return true;
             }
@@ -968,16 +974,19 @@
 
         // Run the MessageDispatch callbacks so that message references get
         // cleaned up.
-        for (Iterator<Command> iter = dispatchQueue.iterator(); 
iter.hasNext();) {
-            Command command = iter.next();
-            if (command.isMessageDispatch()) {
-                MessageDispatch md = (MessageDispatch)command;
-                Runnable sub = md.getTransmitCallback();
-                broker.postProcessDispatch(md);
-                if (sub != null) {
-                    sub.run();
+        synchronized(dispatchQueue) {
+            for (Iterator<Command> iter = dispatchQueue.iterator(); 
iter.hasNext();) {
+                Command command = iter.next();
+                if (command.isMessageDispatch()) {
+                    MessageDispatch md = (MessageDispatch)command;
+                    Runnable sub = md.getTransmitCallback();
+                    broker.postProcessDispatch(md);
+                    if (sub != null) {
+                        sub.run();
+                    }
                 }
             }
+            dispatchQueue.clear();
         }
         //
         // Remove all logical connection associated with this connection


Reply via email to