Author: kwall
Date: Tue Feb  7 11:30:56 2012
New Revision: 1241430

URL: http://svn.apache.org/viewvc?rev=1241430&view=rev
Log:
QPID-3807: Improve thread safety of JMS Session dispatcher.

Make _dispatcherThread/_dispatcher volatile and remove their unused setters.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1241430&r1=1241429&r2=1241430&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Tue Feb  7 11:30:56 2012
@@ -202,9 +202,9 @@ public abstract class AMQSession<C exten
 
     private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new 
ConcurrentLinkedQueue<Long>();
 
-    private Dispatcher _dispatcher;
+    private volatile Dispatcher _dispatcher;
 
-    private Thread _dispatcherThread;
+    private volatile Thread _dispatcherThread;
 
     private MessageFactoryRegistry _messageFactoryRegistry;
 
@@ -309,21 +309,11 @@ public abstract class AMQSession<C exten
         return _dispatcher;
     }
 
-    protected void setDispatcher(Dispatcher dispatcher)
-    {
-        _dispatcher = dispatcher;
-    }
-
     protected Thread getDispatcherThread()
     {
         return _dispatcherThread;
     }
 
-    protected void setDispatcherThread(Thread dispatcherThread)
-    {
-        _dispatcherThread = dispatcherThread;
-    }
-
     /** Holds the message factory factory for this session. */
     protected MessageFactoryRegistry getMessageFactoryRegistry()
     {
@@ -820,7 +810,7 @@ public abstract class AMQSession<C exten
 
         if (e instanceof AMQDisconnectedException)
         {
-            if (_dispatcher != null)
+            if (_dispatcherThread != null)
             {
                 // Failover failed and ain't coming back. Knife the dispatcher.
                 _dispatcherThread.interrupt();
@@ -2326,7 +2316,7 @@ public abstract class AMQSession<C exten
      */
     void start() throws AMQException
     {
-        // Check if the session has perviously been started and suspended, in 
which case it must be unsuspended.
+        // Check if the session has previously been started and suspended, in 
which case it must be unsuspended.
         if (_startedAtLeastOnce.getAndSet(true))
         {
             suspendChannel(false);
@@ -2360,7 +2350,7 @@ public abstract class AMQSession<C exten
                 }
                 catch (AMQException e)
                 {
-                    _logger.info("Unsuspending channel threw an exception:" + 
e);
+                    _logger.info("Unsuspending channel threw an exception:", 
e);
                 }
             }
         }
@@ -2952,7 +2942,7 @@ public abstract class AMQSession<C exten
                     }
                     catch (AMQException e)
                     {
-                        _logger.info("Suspending channel threw an exception:" 
+ e);
+                        _logger.info("Suspending channel threw an exception:", 
e);
                     }
                 }
             }
@@ -3185,7 +3175,7 @@ public abstract class AMQSession<C exten
         private final AtomicBoolean _closed = new AtomicBoolean(false);
 
         private final Object _lock = new Object();
-        private String dispatcherID = "" + System.identityHashCode(this);
+        private final String dispatcherID = "" + System.identityHashCode(this);
 
         public Dispatcher()
         {
@@ -3317,7 +3307,7 @@ public abstract class AMQSession<C exten
                     }
                     catch (InterruptedException e)
                     {
-                        // ignore
+                        Thread.currentThread().interrupt();
                     }
                 }
             }
@@ -3332,7 +3322,7 @@ public abstract class AMQSession<C exten
             }
             catch (InterruptedException e)
             {
-                // ignore
+                // ignored as run will exit immediately
             }
 
             if (_dispatcherLogger.isInfoEnabled())
@@ -3383,7 +3373,7 @@ public abstract class AMQSession<C exten
                 }
                 catch (InterruptedException e)
                 {
-                    // pass
+                    Thread.currentThread().interrupt();
                 }
 
                 if (!(message instanceof CloseConsumerMessage)
@@ -3501,7 +3491,7 @@ public abstract class AMQSession<C exten
             }
             catch (AMQException e)
             {
-                _logger.warn("Unable to " + (_suspend.get() ? "suspend" : 
"unsuspend") + " session " + AMQSession.this + " due to: " + e);
+                _logger.warn("Unable to " + (_suspend.get() ? "suspend" : 
"unsuspend") + " session " + AMQSession.this + " due to: ", e);
                 if (_logger.isDebugEnabled())
                 {
                     _logger.debug("Is the _queue empty?" + _queue.isEmpty());



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to