Refactor the CheckConnected function to handle multiple threads attempting to check connection status against an offline broker. Guard against unwanted exceptions being thrown when indexing into a connection state array that has not been fully set up because the broker is offline.
Fixes [AMQNET-331]. (See https://issues.apache.org/jira/browse/AMQNET-331) Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/commit/3d2d37af Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/tree/3d2d37af Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/diff/3d2d37af Branch: refs/heads/1.5.x Commit: 3d2d37af432882cbc44713d46c1acf4cc8208a01 Parents: 67916c2 Author: Jim Gomes <jgo...@apache.org> Authored: Fri Jun 17 23:58:06 2011 +0000 Committer: Jim Gomes <jgo...@apache.org> Committed: Fri Jun 17 23:58:06 2011 +0000 ---------------------------------------------------------------------- src/main/csharp/Connection.cs | 71 +++++++++++++++----- src/main/csharp/State/ConnectionState.cs | 38 +++++------ src/main/csharp/State/ConnectionStateTracker.cs | 23 ++++--- src/main/csharp/State/SynchronizedObjects.cs | 8 +++ src/main/csharp/Transport/MutexTransport.cs | 6 +- 5 files changed, 99 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/Connection.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs index ffaf180..4ca6aa5 100755 --- a/src/main/csharp/Connection.cs +++ b/src/main/csharp/Connection.cs @@ -501,14 +501,7 @@ namespace Apache.NMS.Stomp public Response SyncRequest(Command command) { - try - { - return SyncRequest(command, this.RequestTimeout); - } - catch(Exception ex) - { - throw NMSExceptionSupport.Create(ex); - } + return SyncRequest(command, this.RequestTimeout); } public Response SyncRequest(Command command, TimeSpan requestTimeout) @@ -546,7 +539,13 @@ namespace Apache.NMS.Stomp } } - protected void CheckConnected() + private object checkConnectedLock = new object(); + + /// <summary> + /// Check and ensure that the connection objcet is connected. If it is not + /// connected or is closed, a ConnectionClosedException is thrown. + /// </summary> + internal void CheckConnected() { if(closed.Value) { @@ -555,17 +554,57 @@ namespace Apache.NMS.Stomp if(!connected.Value) { - if(!this.userSpecifiedClientID) + DateTime timeoutTime = DateTime.Now + this.RequestTimeout; + int waitCount = 1; + + while(true) { - this.info.ClientId = this.clientIdGenerator.GenerateId(); + if(Monitor.TryEnter(checkConnectedLock)) + { + try + { + if(!connected.Value) + { + if(!this.userSpecifiedClientID) + { + this.info.ClientId = this.clientIdGenerator.GenerateId(); + } + + try + { + if(null != transport) + { + // Send the connection and see if an ack/nak is returned. + Response response = transport.Request(this.info, this.RequestTimeout); + if(!(response is ExceptionResponse)) + { + connected.Value = true; + } + } + } + catch + { + } + } + } + finally + { + Monitor.Exit(checkConnectedLock); + } + } + + if(connected.Value || DateTime.Now > timeoutTime) + { + break; + } + + // Back off from being overly aggressive. Having too many threads + // aggressively trying to connect to a down broker pegs the CPU. + Thread.Sleep(5 * (waitCount++)); } - connected.Value = true; - // now lets send the connection and see if we get an ack/nak - if(null == SyncRequest(info)) + if(!connected.Value) { - closed.Value = true; - connected.Value = false; throw new ConnectionClosedException(); } } http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/State/ConnectionState.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/State/ConnectionState.cs b/src/main/csharp/State/ConnectionState.cs index d568a47..c39d67f 100644 --- a/src/main/csharp/State/ConnectionState.cs +++ b/src/main/csharp/State/ConnectionState.cs @@ -25,8 +25,7 @@ namespace Apache.NMS.Stomp.State { ConnectionInfo info; - private readonly AtomicDictionary<ConsumerId, ConsumerState> consumers = - new AtomicDictionary<ConsumerId, ConsumerState>(); + private readonly AtomicDictionary<ConsumerId, ConsumerState> consumers = new AtomicDictionary<ConsumerId, ConsumerState>(); private readonly Atomic<bool> _shutdown = new Atomic<bool>(false); public ConnectionState(ConnectionInfo info) @@ -49,26 +48,25 @@ namespace Apache.NMS.Stomp.State { get { - #if DEBUG - try + ConsumerState consumerState; + + if(consumers.TryGetValue(id, out consumerState)) { - #endif - return consumers[id]; - #if DEBUG + return consumerState; } - catch(System.Collections.Generic.KeyNotFoundException ex) + +#if DEBUG + // Useful for dignosing missing consumer ids + string consumerList = string.Empty; + foreach(ConsumerId consumerId in consumers.Keys) { - // Useful for dignosing missing consumer ids - string consumerList = string.Empty; - foreach(ConsumerId consumerId in consumers.Keys) - { - consumerList += consumerId.ToString() + "\n"; - } - System.Diagnostics.Debug.Assert(false, - string.Format("Consumer '{0}' did not exist in the consumers collection.\n\nConsumers:-\n{1}", id, consumerList)); - throw ex; + consumerList += consumerId.ToString() + "\n"; } - #endif + + System.Diagnostics.Debug.Assert(false, + string.Format("Consumer '{0}' did not exist in the consumers collection.\n\nConsumers:-\n{1}", id, consumerList)); +#endif + return null; } } @@ -80,7 +78,9 @@ namespace Apache.NMS.Stomp.State public ConsumerState removeConsumer(ConsumerId id) { - ConsumerState ret = consumers[id]; + ConsumerState ret = null; + + consumers.TryGetValue(id, out ret); consumers.Remove(id); return ret; } http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/State/ConnectionStateTracker.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/State/ConnectionStateTracker.cs b/src/main/csharp/State/ConnectionStateTracker.cs index d9dbc9d..ccf76a8 100644 --- a/src/main/csharp/State/ConnectionStateTracker.cs +++ b/src/main/csharp/State/ConnectionStateTracker.cs @@ -31,8 +31,7 @@ namespace Apache.NMS.Stomp.State { private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); - protected Dictionary<ConnectionId, ConnectionState> connectionStates = - new Dictionary<ConnectionId, ConnectionState>(); + protected Dictionary<ConnectionId, ConnectionState> connectionStates = new Dictionary<ConnectionId, ConnectionState>(); private bool _restoreConsumers = true; @@ -67,10 +66,10 @@ namespace Apache.NMS.Stomp.State { transport.Oneway(connectionState.Info); - if(RestoreConsumers) - { - DoRestoreConsumers(transport, connectionState); - } + if(RestoreConsumers) + { + DoRestoreConsumers(transport, connectionState); + } } } @@ -97,10 +96,11 @@ namespace Apache.NMS.Stomp.State ConnectionId connectionId = sessionId.ParentId; if(connectionId != null) { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) { - cs.addConsumer(info); + cs.addConsumer(info); } } } @@ -118,8 +118,9 @@ namespace Apache.NMS.Stomp.State ConnectionId connectionId = sessionId.ParentId; if(connectionId != null) { - ConnectionState cs = connectionStates[connectionId]; - if(cs != null) + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) { cs.removeConsumer(id); } http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/State/SynchronizedObjects.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/State/SynchronizedObjects.cs b/src/main/csharp/State/SynchronizedObjects.cs index 24a23a8..2795503 100644 --- a/src/main/csharp/State/SynchronizedObjects.cs +++ b/src/main/csharp/State/SynchronizedObjects.cs @@ -177,6 +177,14 @@ namespace Apache.NMS.Stomp.State } } + public bool TryGetValue(TKey key, out TValue val) + { + lock(((ICollection) _dictionary).SyncRoot) + { + return _dictionary.TryGetValue(key, out val); + } + } + public AtomicCollection<TKey> Keys { get http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/Transport/MutexTransport.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Transport/MutexTransport.cs b/src/main/csharp/Transport/MutexTransport.cs index c5cbc31..540addd 100644 --- a/src/main/csharp/Transport/MutexTransport.cs +++ b/src/main/csharp/Transport/MutexTransport.cs @@ -20,6 +20,7 @@ using Apache.NMS.Stomp.Commands; namespace Apache.NMS.Stomp.Transport { + /// <summary> /// A Transport which guards access to the next transport using a mutex. /// </summary> public class MutexTransport : TransportFilter @@ -31,6 +32,7 @@ namespace Apache.NMS.Stomp.Transport if(timeout > 0) { DateTime timeoutTime = DateTime.Now + TimeSpan.FromMilliseconds(timeout); + int waitCount = 1; while(true) { @@ -44,7 +46,9 @@ namespace Apache.NMS.Stomp.Transport throw new IOException(string.Format("Oneway timed out after {0} milliseconds.", timeout)); } - Thread.Sleep(10); + // Back off from being overly aggressive. Having too many threads + // aggressively trying to get the lock pegs the CPU. + Thread.Sleep(3 * (waitCount++)); } } else