Author: kwall
Date: Tue Feb 7 11:30:56 2012
New Revision: 1241430
URL: http://svn.apache.org/viewvc?rev=1241430&view=rev
Log:
QPID-3807: Improve thread safety of JMS Session dispatcher.
Make _dispatcherThread/_dispatcher volatile and remove their unused setters.
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1241430&r1=1241429&r2=1241430&view=diff
==============================================================================
---
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Tue Feb 7 11:30:56 2012
@@ -202,9 +202,9 @@ public abstract class AMQSession<C exten
private ConcurrentLinkedQueue<Long> _deliveredMessageTags = new
ConcurrentLinkedQueue<Long>();
- private Dispatcher _dispatcher;
+ private volatile Dispatcher _dispatcher;
- private Thread _dispatcherThread;
+ private volatile Thread _dispatcherThread;
private MessageFactoryRegistry _messageFactoryRegistry;
@@ -309,21 +309,11 @@ public abstract class AMQSession<C exten
return _dispatcher;
}
- protected void setDispatcher(Dispatcher dispatcher)
- {
- _dispatcher = dispatcher;
- }
-
protected Thread getDispatcherThread()
{
return _dispatcherThread;
}
- protected void setDispatcherThread(Thread dispatcherThread)
- {
- _dispatcherThread = dispatcherThread;
- }
-
/** Holds the message factory factory for this session. */
protected MessageFactoryRegistry getMessageFactoryRegistry()
{
@@ -820,7 +810,7 @@ public abstract class AMQSession<C exten
if (e instanceof AMQDisconnectedException)
{
- if (_dispatcher != null)
+ if (_dispatcherThread != null)
{
// Failover failed and ain't coming back. Knife the dispatcher.
_dispatcherThread.interrupt();
@@ -2326,7 +2316,7 @@ public abstract class AMQSession<C exten
*/
void start() throws AMQException
{
- // Check if the session has perviously been started and suspended, in
which case it must be unsuspended.
+ // Check if the session has previously been started and suspended, in
which case it must be unsuspended.
if (_startedAtLeastOnce.getAndSet(true))
{
suspendChannel(false);
@@ -2360,7 +2350,7 @@ public abstract class AMQSession<C exten
}
catch (AMQException e)
{
- _logger.info("Unsuspending channel threw an exception:" +
e);
+ _logger.info("Unsuspending channel threw an exception:",
e);
}
}
}
@@ -2952,7 +2942,7 @@ public abstract class AMQSession<C exten
}
catch (AMQException e)
{
- _logger.info("Suspending channel threw an exception:"
+ e);
+ _logger.info("Suspending channel threw an exception:",
e);
}
}
}
@@ -3185,7 +3175,7 @@ public abstract class AMQSession<C exten
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final Object _lock = new Object();
- private String dispatcherID = "" + System.identityHashCode(this);
+ private final String dispatcherID = "" + System.identityHashCode(this);
public Dispatcher()
{
@@ -3317,7 +3307,7 @@ public abstract class AMQSession<C exten
}
catch (InterruptedException e)
{
- // ignore
+ Thread.currentThread().interrupt();
}
}
}
@@ -3332,7 +3322,7 @@ public abstract class AMQSession<C exten
}
catch (InterruptedException e)
{
- // ignore
+ // ignored as run will exit immediately
}
if (_dispatcherLogger.isInfoEnabled())
@@ -3383,7 +3373,7 @@ public abstract class AMQSession<C exten
}
catch (InterruptedException e)
{
- // pass
+ Thread.currentThread().interrupt();
}
if (!(message instanceof CloseConsumerMessage)
@@ -3501,7 +3491,7 @@ public abstract class AMQSession<C exten
}
catch (AMQException e)
{
- _logger.warn("Unable to " + (_suspend.get() ? "suspend" :
"unsuspend") + " session " + AMQSession.this + " due to: " + e);
+ _logger.warn("Unable to " + (_suspend.get() ? "suspend" :
"unsuspend") + " session " + AMQSession.this + " due to: ", e);
if (_logger.isDebugEnabled())
{
_logger.debug("Is the _queue empty?" + _queue.isEmpty());
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]