Author: jgomes
Date: Wed Sep 17 11:33:05 2008
New Revision: 696392
URL: http://svn.apache.org/viewvc?rev=696392&view=rev
Log:
Rolled back most of the changes made to the threading. Kept those that were
definite improvements, and made changes to the existing threading based on the
contribution. Now the performance is back to what it should be with improved
robustness.
Fixes [AMQNET-112]. (See https://issues.apache.org/activemq/browse/AMQNET-112)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=696392&r1=696391&r2=696392&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
Wed Sep 17 11:33:05 2008
@@ -36,19 +36,20 @@
private TimeSpan requestTimeout;
private BrokerInfo brokerInfo; // from broker
private WireFormatInfo brokerWireFormatInfo; // from broker
- private readonly IList sessions = new ArrayList();
+ private readonly IList sessions = ArrayList.Synchronized(new
ArrayList());
/// <summary>
/// Private object used for synchronization, instead of public
"this"
/// </summary>
private readonly object myLock = new object();
private bool asyncSend = false;
private bool asyncClose = true;
- private volatile bool closed;
- private volatile bool triedConnect;
+ private bool connected = false;
+ private bool closed = false;
+ private bool closing = false;
private long sessionCounter = 0;
private long temporaryDestinationCounter = 0;
private long localTransactionCounter;
- private bool started = false;
+ private readonly AtomicBoolean started = new
AtomicBoolean(false);
private bool disposed = false;
public Connection(Uri connectionUri, ITransport transport,
ConnectionInfo info)
@@ -111,11 +112,10 @@
public void Start()
{
CheckConnected();
- lock(myLock)
+ if(started.CompareAndSet(false, true))
{
- if(!started)
+ lock(sessions.SyncRoot)
{
- started = true;
foreach(Session session in sessions)
{
session.StartAsyncDelivery();
@@ -130,7 +130,7 @@
/// </summary>
public bool IsStarted
{
- get { return started; }
+ get { return started.Value; }
}
/// <summary>
@@ -140,11 +140,10 @@
public void Stop()
{
CheckConnected();
- lock(myLock)
+ if(started.CompareAndSet(true, false))
{
- if(started)
+ lock(sessions.SyncRoot)
{
- started = false;
foreach(Session session in sessions)
{
session.StopAsyncDelivery();
@@ -174,22 +173,20 @@
System.Collections.Specialized.StringDictionary map =
URISupport.ParseQuery(this.brokerUri.Query);
URISupport.SetProperties(session, map, "session.");
- lock(myLock)
+ if(IsStarted)
{
- if(IsStarted)
- {
- session.StartAsyncDelivery();
- }
-
- sessions.Add(session);
+ session.StartAsyncDelivery();
}
+
+ sessions.Add(session);
return session;
}
public void RemoveSession(Session session)
{
DisposeOf(session.SessionId);
- lock(myLock)
+
+ if(!this.closing)
{
sessions.Remove(session);
}
@@ -197,41 +194,22 @@
public void Close()
{
- if(closed)
- {
- return;
- }
-
- //
- // Do a first-run close of sessions after brief
synchronization
- //
- IList sessionsCopy;
- lock(myLock)
- {
- sessionsCopy = new ArrayList(sessions);
- }
-
- foreach(Session session in sessionsCopy)
- {
- session.Close();
- }
-
lock(myLock)
{
- if(closed)
+ if(this.closed)
{
return;
}
try
{
- //
- // Copy again for safe enumeration.
Always assume
- // that closing a session modifies the
sessions list.
- //
- foreach(Session session in new
ArrayList(sessions))
+ this.closing = true;
+ lock(sessions.SyncRoot)
{
- session.Close();
+ foreach(Session session in
sessions)
+ {
+ session.Close();
+ }
}
sessions.Clear();
@@ -245,8 +223,9 @@
}
finally
{
- closed = true;
- transport = null;
+ this.transport = null;
+ this.closed = true;
+ this.closing = false;
}
}
}
@@ -314,7 +293,7 @@
get { return info.ClientId; }
set
{
- if(triedConnect)
+ if(connected)
{
throw new NMSException("You cannot
change the ClientId once the Connection is connected");
}
@@ -397,9 +376,7 @@
/// </summary>
public String CreateTemporaryDestinationName()
{
- return info.ConnectionId.Value
- + ":"
- + Interlocked.Increment(ref
temporaryDestinationCounter);
+ return info.ConnectionId.Value + ":" +
Interlocked.Increment(ref temporaryDestinationCounter);
}
/// <summary>
@@ -420,30 +397,14 @@
throw new ConnectionClosedException();
}
- if(triedConnect)
- {
- return;
- }
-
- lock(myLock)
+ if(!connected)
{
- if(closed)
- {
- throw new ConnectionClosedException();
- }
-
- if(triedConnect)
- {
- return;
- }
-
- // Set this in advance, to short-circuit
SyncRequest's call to this method
- triedConnect = true;
-
+ connected = true;
// now lets send the connection and see if we
get an ack/nak
if(null == SyncRequest(info))
{
closed = true;
+ connected = false;
throw new ConnectionClosedException();
}
}
@@ -475,12 +436,9 @@
else if(command is ShutdownInfo)
{
//ShutdownInfo info = (ShutdownInfo)command;
- lock(myLock)
+ if(!closing && !closed)
{
- if(!closed)
- {
- OnException(commandTransport,
new NMSException("Broker closed this connection."));
- }
+ OnException(commandTransport, new
NMSException("Broker closed this connection."));
}
}
else
@@ -493,7 +451,7 @@
{
bool dispatched = false;
- lock(myLock)
+ lock(sessions.SyncRoot)
{
foreach(Session session in sessions)
{
@@ -569,6 +527,5 @@
answer.SessionId = sessionId;
return answer;
}
-
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs?rev=696392&r1=696391&r2=696392&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/DispatchingThread.cs
Wed Sep 17 11:33:05 2008
@@ -27,7 +27,7 @@
public delegate void ExceptionHandler(Exception exception);
private readonly AutoResetEvent m_event = new
AutoResetEvent(false);
- private readonly ManualResetEvent m_stopEvent = new
ManualResetEvent(false);
+ private bool m_bStopFlag = false;
private Thread m_thread = null;
private readonly DispatchFunction m_dispatchFunc;
private event ExceptionHandler m_exceptionListener;
@@ -37,18 +37,7 @@
m_dispatchFunc = dispatchFunc;
}
- public bool IsStarted
- {
- get
- {
- lock(this)
- {
- return (null != m_thread);
- }
- }
- }
-
- // TODO can't use EventWaitHandle on MONO 1.0
+ // TODO can't use EventWaitHandle on MONO 1.0
public AutoResetEvent EventHandle
{
get { return m_event; }
@@ -72,9 +61,10 @@
{
if (m_thread == null)
{
- m_stopEvent.Reset();
+ m_bStopFlag = false;
m_thread = new Thread(new
ThreadStart(MyThreadFunc));
m_thread.IsBackground = true;
+ m_event.Set();
Tracer.Info("Starting dispatcher thread
for session");
m_thread.Start();
}
@@ -90,12 +80,16 @@
internal void Stop(int timeoutMilliseconds)
{
Tracer.Info("Stopping dispatcher thread for session");
- Thread localThread;
+ Thread localThread = null;
lock (this)
{
localThread = m_thread;
m_thread = null;
- m_stopEvent.Set();
+ if (!m_bStopFlag)
+ {
+ m_bStopFlag = true;
+ m_event.Set();
+ }
}
if(localThread!=null)
{
@@ -111,43 +105,49 @@
private void MyThreadFunc()
{
Tracer.Info("Dispatcher thread started");
-
- //
- // Put m_stopEvent first so it is preferred if both are
signaled
- //
- WaitHandle[] signals = new WaitHandle[] {
- m_stopEvent,
- m_event
- };
- const int kStopEventOffset = 0;
-
try
{
while (true) // loop forever (well, at least
until we've been asked to stop)
{
- try
+ lock (this)
{
- m_dispatchFunc();
+ if (m_bStopFlag)
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ break;
+
+
}
- catch(ThreadAbortException)
+
+ try
{
- // Throw for handling down below
- throw;
+ m_dispatchFunc();
}
- catch(Exception ex)
+ catch (Exception ex)
{
if(m_exceptionListener != null)
{
m_exceptionListener(ex);
}
}
-
- int sigOffset =
WaitHandle.WaitAny(signals);
- if(kStopEventOffset == sigOffset)
- {
- break;
- }
- // otherwise, continue the loop
+ m_event.WaitOne();
}
Tracer.Info("Dispatcher thread stopped");
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=696392&r1=696391&r2=696392&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
Wed Sep 17 11:33:05 2008
@@ -28,21 +28,22 @@
/// </summary>
public class Session : ISession
{
- private long consumerCounter;
- private readonly IDictionary consumers = new Hashtable();
- private readonly IDictionary producers = new Hashtable();
- private readonly DispatchingThread dispatchingThread;
/// <summary>
/// Private object used for synchronization, instead of public
"this"
/// </summary>
private readonly object myLock = new object();
+ private long consumerCounter;
+ private readonly IDictionary consumers =
Hashtable.Synchronized(new Hashtable());
+ private readonly IDictionary producers =
Hashtable.Synchronized(new Hashtable());
+ private readonly DispatchingThread dispatchingThread;
private DispatchingThread.ExceptionHandler
dispatchingThread_ExceptionHandler;
private readonly SessionInfo info;
private long producerCounter;
+ internal bool startedAsyncDelivery = false;
private bool disposed = false;
- private volatile bool closed = false;
-
- private const int kMAX_STOP_ASYNC_MS = 30000;
+ private bool closed = false;
+ private bool closing = false;
+ private TimeSpan MAX_THREAD_WAIT =
TimeSpan.FromMilliseconds(30000);
public Session(Connection connection, SessionInfo info,
AcknowledgementMode acknowledgementMode)
{
@@ -153,60 +154,37 @@
public void Close()
{
- if(closed)
- {
- return;
- }
-
- //
- // Do a first-run close of consumer and producers after
- // brief synchronization
- //
- IList consumersCopy;
- IList producersCopy;
-
- lock(myLock)
- {
- consumersCopy = new ArrayList(consumers.Values);
- producersCopy = new ArrayList(producers.Values);
- }
-
- foreach(MessageConsumer consumer in consumersCopy)
- {
- consumer.Close();
- }
-
- foreach(MessageProducer producer in producersCopy)
- {
- producer.Close();
- }
-
lock(myLock)
{
- if(closed)
+ if(this.closed)
{
return;
}
try
{
+ this.closing = true;
StopAsyncDelivery();
- // Copy again for safe enumeration
- foreach(MessageConsumer consumer in new
ArrayList(consumers.Values))
+ Connection.RemoveSession(this);
+ lock(consumers.SyncRoot)
{
- consumer.Close();
+ foreach(MessageConsumer
consumer in consumers.Values)
+ {
+ consumer.Close();
+ }
}
consumers.Clear();
- // Copy again for safe enumeration
- foreach(MessageProducer producer in new
ArrayList(producers.Values))
+ lock(producers.SyncRoot)
{
- producer.Close();
+ foreach(MessageProducer
producer in producers.Values)
+ {
+ producer.Close();
+ }
}
producers.Clear();
- Connection.RemoveSession(this);
}
- catch (Exception ex)
+ catch(Exception ex)
{
Tracer.ErrorFormat("Error during
session close: {0}", ex);
}
@@ -214,6 +192,7 @@
{
this.connection = null;
this.closed = true;
+ this.closing = false;
}
}
}
@@ -227,24 +206,19 @@
{
ProducerInfo command = CreateProducerInfo(destination);
ProducerId producerId = command.ProducerId;
- MessageProducer producer = new MessageProducer(this,
command);
- lock(myLock)
- {
- producers[producerId] = producer;
- }
+ MessageProducer producer = null;
try
{
+ producer = new MessageProducer(this, command);
+ producers[producerId] = producer;
this.DoSend(command);
}
catch(Exception)
{
- //
- // DoSend failed. No need to call
MessageProducer.Close
- //
- lock(myLock)
+ if(producer != null)
{
- producers.Remove(producerId);
+ producer.Close();
}
throw;
@@ -272,31 +246,23 @@
ConsumerId consumerId = command.ConsumerId;
MessageConsumer consumer = null;
- consumer = new MessageConsumer(this, command,
this.AcknowledgementMode);
- // let's register the consumer first in case we start
dispatching messages immediately
- lock(myLock)
- {
- consumers[consumerId] = consumer;
- }
-
try
{
+ consumer = new MessageConsumer(this, command,
this.AcknowledgementMode);
+ // lets register the consumer first in case we
start dispatching messages immediately
+ consumers[consumerId] = consumer;
this.DoSend(command);
+ return consumer;
}
catch(Exception)
{
- //
- // DoSend failed. No need to call
MessageProducer.Close
- //
- lock(myLock)
+ if(consumer != null)
{
- consumers.Remove(consumerId);
+ consumer.Close();
}
throw;
}
-
- return consumer;
}
public IMessageConsumer CreateDurableConsumer(ITopic
destination, string name, string selector, bool noLocal)
@@ -307,25 +273,18 @@
command.NoLocal = noLocal;
MessageConsumer consumer = null;
- consumer = new MessageConsumer(this, command,
this.AcknowledgementMode);
- // let's register the consumer first in case we start
dispatching messages immediately
- lock(myLock)
- {
- consumers[consumerId] = consumer;
- }
-
try
{
+ consumer = new MessageConsumer(this, command,
this.AcknowledgementMode);
+ // lets register the consumer first in case we
start dispatching messages immediately
+ consumers[consumerId] = consumer;
this.DoSend(command);
}
catch(Exception)
{
- //
- // DoSend failed; no need to call
MessageConsumer.Close
- //
- lock(myLock)
+ if(consumer != null)
{
- consumers.Remove(consumerId);
+ consumer.Close();
}
throw;
@@ -336,14 +295,9 @@
public void DeleteDurableConsumer(string name)
{
- IConnection conn = this.Connection;
RemoveSubscriptionInfo command = new
RemoveSubscriptionInfo();
- if(null != conn)
- {
- command.ConnectionId = Connection.ConnectionId;
- command.ClientId = Connection.ClientId;
- }
-
+ command.ConnectionId = Connection.ConnectionId;
+ command.ClientId = Connection.ClientId;
command.SubcriptionName = name;
this.DoSend(command);
}
@@ -441,7 +395,7 @@
this.TransactionContext.Rollback();
// lets ensure all the consumers redeliver any rolled
back messages
- lock(myLock)
+ lock(consumers.SyncRoot)
{
foreach(MessageConsumer consumer in
consumers.Values)
{
@@ -538,7 +492,7 @@
public void DisposeOf(ConsumerId objectId)
{
Connection.DisposeOf(objectId);
- lock(myLock)
+ if(!this.closing)
{
consumers.Remove(objectId);
}
@@ -547,7 +501,7 @@
public void DisposeOf(ProducerId objectId)
{
Connection.DisposeOf(objectId);
- lock(myLock)
+ if(!this.closing)
{
producers.Remove(objectId);
}
@@ -556,11 +510,7 @@
public bool DispatchMessage(ConsumerId consumerId, Message
message)
{
bool dispatched = false;
- MessageConsumer consumer;
- lock(myLock)
- {
- consumer = (MessageConsumer)
consumers[consumerId];
- }
+ MessageConsumer consumer = (MessageConsumer)
consumers[consumerId];
if(consumer != null)
{
@@ -579,7 +529,7 @@
{
// lets iterate through each consumer created by this
session
// ensuring that they have all pending messages
dispatched
- lock(myLock)
+ lock(consumers.SyncRoot)
{
foreach(MessageConsumer consumer in
consumers.Values)
{
@@ -588,30 +538,6 @@
}
}
- /// <summary>
- /// Returns a copy of the current consumers in a thread safe
way to avoid concurrency
- /// problems if the consumers are changed in another thread
- /// </summary>
- protected ICollection GetConsumers()
- {
- lock(myLock)
- {
- return new ArrayList(consumers.Values);
- }
- }
-
- /// <summary>
- /// Returns a copy of the current consumers in a thread safe
way to avoid concurrency
- /// problems if the consumers are changed in another thread
- /// </summary>
- protected ICollection GetProducers()
- {
- lock(myLock)
- {
- return new ArrayList(producers.Values);
- }
- }
-
protected virtual ConsumerInfo CreateConsumerInfo(IDestination
destination, string selector)
{
ConsumerInfo answer = new ConsumerInfo();
@@ -669,19 +595,21 @@
internal void StopAsyncDelivery()
{
- if(dispatchingThread.IsStarted)
+ if(startedAsyncDelivery)
{
this.dispatchingThread.ExceptionListener -=
this.dispatchingThread_ExceptionHandler;
- dispatchingThread.Stop(kMAX_STOP_ASYNC_MS);
+ dispatchingThread.Stop((int)
MAX_THREAD_WAIT.TotalMilliseconds);
+ startedAsyncDelivery = false;
}
}
internal void StartAsyncDelivery()
{
- if(!dispatchingThread.IsStarted)
+ if(!startedAsyncDelivery)
{
this.dispatchingThread.ExceptionListener +=
this.dispatchingThread_ExceptionHandler;
dispatchingThread.Start();
+ startedAsyncDelivery = true;
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=696392&r1=696391&r2=696392&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
Wed Sep 17 11:33:05 2008
@@ -25,7 +25,6 @@
namespace Apache.NMS.ActiveMQ.Transport.Tcp
{
-
/// <summary>
/// An implementation of ITransport that uses sockets to communicate
with the broker
/// </summary>
@@ -38,7 +37,7 @@
private BinaryWriter socketWriter;
private Thread readThread;
private bool started;
- private volatile bool closed;
+ private AtomicBoolean closed = new AtomicBoolean(false);
private volatile bool seenShutdown;
private TimeSpan maxWait =
TimeSpan.FromMilliseconds(Timeout.Infinite);
@@ -46,7 +45,6 @@
private ExceptionHandler exceptionHandler;
private TimeSpan MAX_THREAD_WAIT =
TimeSpan.FromMilliseconds(30000);
-
public TcpTransport(Socket socket, IWireFormat wireformat)
{
this.socket = socket;
@@ -114,7 +112,7 @@
{
try
{
- if(closed)
+ if(closed.Value)
{
throw new
InvalidOperationException("Error writing to broker. Transport connection is
closed.");
}
@@ -129,6 +127,7 @@
}
catch(Exception ex)
{
+ Monitor.Exit(myLock);
if (command.ResponseRequired)
{
// Make sure that something
higher up doesn't get blocked.
@@ -183,17 +182,10 @@
public void Close()
{
- if(!closed)
+ if(closed.CompareAndSet(false, true))
{
lock(myLock)
{
- if(closed)
- {
- return;
- }
-
- closed = true;
-
try
{
socket.Shutdown(SocketShutdown.Both);
@@ -300,7 +292,7 @@
// An exception in the command handler may not be fatal
to the transport, so
// these are simply reported to the exceptionHandler.
//
- while(!closed)
+ while(!closed.Value)
{
Command command = null;
@@ -311,11 +303,14 @@
catch(Exception ex)
{
command = null;
- if(!closed && !seenShutdown)
+ if(!closed.Value)
{
// Close the socket as there's
little that can be done with this transport now.
Close();
- this.exceptionHandler(this, ex);
+ if(!seenShutdown)
+ {
+
this.exceptionHandler(this, ex);
+ }
}
break;