Refactor the CheckConnected function to handle multiple threads attempting to 
check connection status against an offline broker.  Guard against unwanted 
exceptions being thrown when indexing into a connection state array that has 
not been fully set up because the broker is offline.

Fixes [AMQNET-331]. (See https://issues.apache.org/jira/browse/AMQNET-331)



Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/commit/3d2d37af
Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/tree/3d2d37af
Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/diff/3d2d37af

Branch: refs/heads/1.5.x
Commit: 3d2d37af432882cbc44713d46c1acf4cc8208a01
Parents: 67916c2
Author: Jim Gomes <jgo...@apache.org>
Authored: Fri Jun 17 23:58:06 2011 +0000
Committer: Jim Gomes <jgo...@apache.org>
Committed: Fri Jun 17 23:58:06 2011 +0000

----------------------------------------------------------------------
 src/main/csharp/Connection.cs                   | 71 +++++++++++++++-----
 src/main/csharp/State/ConnectionState.cs        | 38 +++++------
 src/main/csharp/State/ConnectionStateTracker.cs | 23 ++++---
 src/main/csharp/State/SynchronizedObjects.cs    |  8 +++
 src/main/csharp/Transport/MutexTransport.cs     |  6 +-
 5 files changed, 99 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/Connection.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index ffaf180..4ca6aa5 100755
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -501,14 +501,7 @@ namespace Apache.NMS.Stomp
 
         public Response SyncRequest(Command command)
         {
-            try
-            {
-                return SyncRequest(command, this.RequestTimeout);
-            }
-            catch(Exception ex)
-            {
-                throw NMSExceptionSupport.Create(ex);
-            }
+            return SyncRequest(command, this.RequestTimeout);
         }
 
         public Response SyncRequest(Command command, TimeSpan requestTimeout)
@@ -546,7 +539,13 @@ namespace Apache.NMS.Stomp
             }
         }
 
-        protected void CheckConnected()
+        private object checkConnectedLock = new object();
+
+        /// <summary>
+        /// Check and ensure that the connection objcet is connected.  If it 
is not
+        /// connected or is closed, a ConnectionClosedException is thrown.
+        /// </summary>
+        internal void CheckConnected()
         {
             if(closed.Value)
             {
@@ -555,17 +554,57 @@ namespace Apache.NMS.Stomp
 
             if(!connected.Value)
             {
-                if(!this.userSpecifiedClientID)
+                DateTime timeoutTime = DateTime.Now + this.RequestTimeout;
+                int waitCount = 1;
+
+                while(true)
                 {
-                    this.info.ClientId = this.clientIdGenerator.GenerateId();
+                    if(Monitor.TryEnter(checkConnectedLock))
+                    {
+                        try
+                        {
+                            if(!connected.Value)
+                            {
+                                if(!this.userSpecifiedClientID)
+                                {
+                                    this.info.ClientId = 
this.clientIdGenerator.GenerateId();
+                                }
+
+                                try
+                                {
+                                    if(null != transport)
+                                    {
+                                        // Send the connection and see if an 
ack/nak is returned.
+                                        Response response = 
transport.Request(this.info, this.RequestTimeout);
+                                        if(!(response is ExceptionResponse))
+                                        {
+                                            connected.Value = true;
+                                        }
+                                    }
+                                }
+                                catch
+                                {
+                                }
+                            }
+                        }
+                        finally
+                        {
+                            Monitor.Exit(checkConnectedLock);
+                        }
+                    }
+
+                    if(connected.Value || DateTime.Now > timeoutTime)
+                    {
+                        break;
+                    }
+
+                    // Back off from being overly aggressive.  Having too many 
threads
+                    // aggressively trying to connect to a down broker pegs 
the CPU.
+                    Thread.Sleep(5 * (waitCount++));
                 }
 
-                connected.Value = true;
-                // now lets send the connection and see if we get an ack/nak
-                if(null == SyncRequest(info))
+                if(!connected.Value)
                 {
-                    closed.Value = true;
-                    connected.Value = false;
                     throw new ConnectionClosedException();
                 }
             }

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/State/ConnectionState.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/State/ConnectionState.cs 
b/src/main/csharp/State/ConnectionState.cs
index d568a47..c39d67f 100644
--- a/src/main/csharp/State/ConnectionState.cs
+++ b/src/main/csharp/State/ConnectionState.cs
@@ -25,8 +25,7 @@ namespace Apache.NMS.Stomp.State
        {
 
                ConnectionInfo info;
-               private readonly AtomicDictionary<ConsumerId, ConsumerState> 
consumers =
-            new AtomicDictionary<ConsumerId, ConsumerState>();
+               private readonly AtomicDictionary<ConsumerId, ConsumerState> 
consumers = new AtomicDictionary<ConsumerId, ConsumerState>();
                private readonly Atomic<bool> _shutdown = new 
Atomic<bool>(false);
 
                public ConnectionState(ConnectionInfo info)
@@ -49,26 +48,25 @@ namespace Apache.NMS.Stomp.State
                {
                        get
                        {
-                               #if DEBUG
-                               try
+                               ConsumerState consumerState;
+                               
+                               if(consumers.TryGetValue(id, out consumerState))
                                {
-                               #endif
-                                       return consumers[id];
-                               #if DEBUG
+                                       return consumerState;
                                }
-                               
catch(System.Collections.Generic.KeyNotFoundException ex)
+                               
+#if DEBUG
+                               // Useful for dignosing missing consumer ids
+                               string consumerList = string.Empty;
+                               foreach(ConsumerId consumerId in consumers.Keys)
                                {
-                                       // Useful for dignosing missing 
consumer ids
-                                       string consumerList = string.Empty;
-                                       foreach(ConsumerId consumerId in 
consumers.Keys)
-                                       {
-                                               consumerList += 
consumerId.ToString() + "\n";
-                                       }
-                                       System.Diagnostics.Debug.Assert(false,
-                                               string.Format("Consumer '{0}' 
did not exist in the consumers collection.\n\nConsumers:-\n{1}", id, 
consumerList));
-                                       throw ex;
+                                       consumerList += consumerId.ToString() + 
"\n";
                                }
-                               #endif
+                               
+                               System.Diagnostics.Debug.Assert(false,
+                                       string.Format("Consumer '{0}' did not 
exist in the consumers collection.\n\nConsumers:-\n{1}", id, consumerList));
+#endif
+                               return null;
                        }
                }
 
@@ -80,7 +78,9 @@ namespace Apache.NMS.Stomp.State
 
                public ConsumerState removeConsumer(ConsumerId id)
                {
-                       ConsumerState ret = consumers[id];
+                       ConsumerState ret = null;
+                       
+                       consumers.TryGetValue(id, out ret);
                        consumers.Remove(id);
                        return ret;
                }

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/State/ConnectionStateTracker.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/State/ConnectionStateTracker.cs 
b/src/main/csharp/State/ConnectionStateTracker.cs
index d9dbc9d..ccf76a8 100644
--- a/src/main/csharp/State/ConnectionStateTracker.cs
+++ b/src/main/csharp/State/ConnectionStateTracker.cs
@@ -31,8 +31,7 @@ namespace Apache.NMS.Stomp.State
        {
                private static readonly Tracked TRACKED_RESPONSE_MARKER = new 
Tracked(null);
 
-               protected Dictionary<ConnectionId, ConnectionState> 
connectionStates =
-            new Dictionary<ConnectionId, ConnectionState>();
+               protected Dictionary<ConnectionId, ConnectionState> 
connectionStates = new Dictionary<ConnectionId, ConnectionState>();
 
                private bool _restoreConsumers = true;
 
@@ -67,10 +66,10 @@ namespace Apache.NMS.Stomp.State
                        {
                                transport.Oneway(connectionState.Info);
 
-                if(RestoreConsumers)
-                {
-                    DoRestoreConsumers(transport, connectionState);
-                }
+                               if(RestoreConsumers)
+                               {
+                                       DoRestoreConsumers(transport, 
connectionState);
+                               }
                        }
                }
 
@@ -97,10 +96,11 @@ namespace Apache.NMS.Stomp.State
                                        ConnectionId connectionId = 
sessionId.ParentId;
                                        if(connectionId != null)
                                        {
-                                               ConnectionState cs = 
connectionStates[connectionId];
-                                               if(cs != null)
+                                               ConnectionState cs = null;
+                                               
+                                               
if(connectionStates.TryGetValue(connectionId, out cs))
                                                {
-                                                   cs.addConsumer(info);
+                                                       cs.addConsumer(info);
                                                }
                                        }
                                }
@@ -118,8 +118,9 @@ namespace Apache.NMS.Stomp.State
                                        ConnectionId connectionId = 
sessionId.ParentId;
                                        if(connectionId != null)
                                        {
-                                               ConnectionState cs = 
connectionStates[connectionId];
-                                               if(cs != null)
+                                               ConnectionState cs = null;
+                                               
+                                               
if(connectionStates.TryGetValue(connectionId, out cs))
                                                {
                                                        cs.removeConsumer(id);
                                                }

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/State/SynchronizedObjects.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/State/SynchronizedObjects.cs 
b/src/main/csharp/State/SynchronizedObjects.cs
index 24a23a8..2795503 100644
--- a/src/main/csharp/State/SynchronizedObjects.cs
+++ b/src/main/csharp/State/SynchronizedObjects.cs
@@ -177,6 +177,14 @@ namespace Apache.NMS.Stomp.State
                        }
                }
 
+               public bool TryGetValue(TKey key, out TValue val)
+               {
+                       lock(((ICollection) _dictionary).SyncRoot)
+                       {
+                               return _dictionary.TryGetValue(key, out val);
+                       }
+               }
+
                public AtomicCollection<TKey> Keys
                {
                        get

http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/3d2d37af/src/main/csharp/Transport/MutexTransport.cs
----------------------------------------------------------------------
diff --git a/src/main/csharp/Transport/MutexTransport.cs 
b/src/main/csharp/Transport/MutexTransport.cs
index c5cbc31..540addd 100644
--- a/src/main/csharp/Transport/MutexTransport.cs
+++ b/src/main/csharp/Transport/MutexTransport.cs
@@ -20,6 +20,7 @@ using Apache.NMS.Stomp.Commands;
 
 namespace Apache.NMS.Stomp.Transport
 {
+       /// <summary>
        /// A Transport which guards access to the next transport using a mutex.
        /// </summary>
        public class MutexTransport : TransportFilter
@@ -31,6 +32,7 @@ namespace Apache.NMS.Stomp.Transport
                        if(timeout > 0)
                        {
                                DateTime timeoutTime = DateTime.Now + 
TimeSpan.FromMilliseconds(timeout);
+                               int waitCount = 1;
 
                                while(true)
                                {
@@ -44,7 +46,9 @@ namespace Apache.NMS.Stomp.Transport
                                                throw new 
IOException(string.Format("Oneway timed out after {0} milliseconds.", timeout));
                                        }
 
-                                       Thread.Sleep(10);
+                                       // Back off from being overly 
aggressive.  Having too many threads
+                                       // aggressively trying to get the lock 
pegs the CPU.
+                                       Thread.Sleep(3 * (waitCount++));
                                }
                        }
                        else

Reply via email to