Author: tabish
Date: Mon Aug 11 17:56:01 2014
New Revision: 1617335

URL: http://svn.apache.org/r1617335
Log:
Apply fix for potential deadlock on restore with active pull requests.
Fixes [AMQNET-AMQNET-487]. (See 
https://issues.apache.org/jira/browse/AMQNET-AMQNET-487)

Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs?rev=1617335&r1=1617334&r2=1617335&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ConnectionStateTracker.cs
 Mon Aug 11 17:56:01 2014
@@ -32,8 +32,8 @@ namespace Apache.NMS.ActiveMQ.State
     {
         private static readonly Tracked TRACKED_RESPONSE_MARKER = new 
Tracked(null);
 
-        protected readonly Dictionary<ConnectionId, ConnectionState> 
connectionStates = 
-                       new Dictionary<ConnectionId, ConnectionState>();
+        protected readonly Dictionary<ConnectionId, ConnectionState> 
connectionStates =
+            new Dictionary<ConnectionId, ConnectionState>();
 
         private bool isTrackTransactions;
         private bool isTrackTransactionProducers = true;
@@ -60,10 +60,10 @@ namespace Apache.NMS.ActiveMQ.State
             {
                 ConnectionState cs;
 
-                               
if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs))
-                               {
-                                       
cs.RemoveTransactionState(info.TransactionId);
-                               }
+                if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs))
+                {
+                    cs.RemoveTransactionState(info.TransactionId);
+                }
             }
         }
 
@@ -148,8 +148,8 @@ namespace Apache.NMS.ActiveMQ.State
                         {
                             if (Tracer.IsDebugEnabled)
                             {
-                                Tracer.Debug("rolling back potentially 
completed tx: " + 
-                                                                            
transactionState.Id);
+                                Tracer.Debug("rolling back potentially 
completed tx: " +
+                                             transactionState.Id);
                             }
                             toRollback.Add(transactionInfo);
                             continue;
@@ -167,7 +167,7 @@ namespace Apache.NMS.ActiveMQ.State
                     }
                     transport.Oneway(producerState.Info);
                 }
-    
+
                 foreach (Command command in transactionState.Commands)
                 {
                     if (Tracer.IsDebugEnabled)
@@ -176,7 +176,7 @@ namespace Apache.NMS.ActiveMQ.State
                     }
                     transport.Oneway(command);
                 }
-    
+
                 foreach (ProducerState producerState in 
transactionState.ProducerStates)
                 {
                     if (Tracer.IsDebugEnabled)
@@ -239,15 +239,15 @@ namespace Apache.NMS.ActiveMQ.State
             // Restore the session's consumers but possibly in pull only 
(prefetch 0 state) till
             // recovery completes.
 
-                       ConnectionState connectionState = null;
-                       bool connectionInterruptionProcessingComplete = false;
+            ConnectionState connectionState = null;
+            bool connectionInterruptionProcessingComplete = false;
 
-                       
if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out 
connectionState))
-                       {
-                               connectionInterruptionProcessingComplete = 
connectionState.ConnectionInterruptProcessingComplete;
-                       }
-                       
-                       // Restore the session's consumers
+            
if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out 
connectionState))
+            {
+                connectionInterruptionProcessingComplete = 
connectionState.ConnectionInterruptProcessingComplete;
+            }
+
+            // Restore the session's consumers
             foreach(ConsumerState consumerState in sessionState.ConsumerStates)
             {
                 ConsumerInfo infoToSend = consumerState.Info;
@@ -255,13 +255,13 @@ namespace Apache.NMS.ActiveMQ.State
                 if(!connectionInterruptionProcessingComplete && 
infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5)
                 {
                     infoToSend = consumerState.Info.Clone() as ConsumerInfo;
-                                       lock(((ICollection) 
connectionState.RecoveringPullConsumers).SyncRoot)
-                                       {
-                                               
if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
-                                               {
-                                                       
connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, 
consumerState.Info);
-                                               }
-                                       }
+                    lock(((ICollection) 
connectionState.RecoveringPullConsumers).SyncRoot)
+                    {
+                        
if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId))
+                        {
+                            
connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, 
consumerState.Info);
+                        }
+                    }
                     infoToSend.PrefetchSize = 0;
                     if(Tracer.IsDebugEnabled)
                     {
@@ -316,10 +316,10 @@ namespace Apache.NMS.ActiveMQ.State
             {
                 ConnectionState cs;
 
-                               
if(connectionStates.TryGetValue(info.ConnectionId, out cs))
-                               {
-                                       cs.AddTempDestination(info);
-                               }
+                if(connectionStates.TryGetValue(info.ConnectionId, out cs))
+                {
+                    cs.AddTempDestination(info);
+                }
             }
             return TRACKED_RESPONSE_MARKER;
         }
@@ -329,8 +329,8 @@ namespace Apache.NMS.ActiveMQ.State
             if(info != null && info.Destination.IsTemporary)
             {
                 ConnectionState cs;
-                               
if(connectionStates.TryGetValue(info.ConnectionId, out cs))
-                               {
+                if(connectionStates.TryGetValue(info.ConnectionId, out cs))
+                {
                     cs.RemoveTempDestination(info.Destination);
                 }
             }
@@ -348,8 +348,8 @@ namespace Apache.NMS.ActiveMQ.State
                     if(connectionId != null)
                     {
                         ConnectionState cs;
-                                               
-                                               
if(connectionStates.TryGetValue(connectionId, out cs))
+
+                        if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             SessionState ss = cs[sessionId];
                             if(ss != null)
@@ -373,9 +373,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = sessionId.ParentId;
                     if(connectionId != null)
                     {
-                                               ConnectionState cs = null;
-                                               
-                                               
if(connectionStates.TryGetValue(connectionId, out cs))
+                        ConnectionState cs = null;
+
+                        if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             SessionState ss = cs[sessionId];
                             if(ss != null)
@@ -399,9 +399,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = sessionId.ParentId;
                     if(connectionId != null)
                     {
-                                               ConnectionState cs = null;
+                        ConnectionState cs = null;
 
-                                               
if(connectionStates.TryGetValue(connectionId, out cs))
+                        if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             SessionState ss = cs[sessionId];
                             if(ss != null)
@@ -425,9 +425,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = sessionId.ParentId;
                     if(connectionId != null)
                     {
-                                               ConnectionState cs = null;
+                        ConnectionState cs = null;
 
-                                               
if(connectionStates.TryGetValue(connectionId, out cs))
+                        if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             SessionState ss = cs[sessionId];
                             if(ss != null)
@@ -435,7 +435,7 @@ namespace Apache.NMS.ActiveMQ.State
                                 ss.RemoveConsumer(id);
                             }
 
-                                                       
cs.RecoveringPullConsumers.Remove(id);
+                            cs.RecoveringPullConsumers.Remove(id);
                         }
                     }
                 }
@@ -450,9 +450,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.SessionId.ParentId;
                 if(connectionId != null)
                 {
-                                       ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-                                       
if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         cs.AddSession(info);
                     }
@@ -468,9 +468,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = id.ParentId;
                 if(connectionId != null)
                 {
-                                       ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-                                       
if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         cs.RemoveSession(id);
                     }
@@ -483,16 +483,16 @@ namespace Apache.NMS.ActiveMQ.State
         {
             if(info != null)
             {
-                               ConnectionState connState = new 
ConnectionState(info);
+                ConnectionState connState = new ConnectionState(info);
 
-                               
if(connectionStates.ContainsKey(info.ConnectionId))
-                               {
-                                       connectionStates[info.ConnectionId] = 
connState;
-                               }
-                               else
-                               {
-                                       connectionStates.Add(info.ConnectionId, 
connState);
-                               }
+                if(connectionStates.ContainsKey(info.ConnectionId))
+                {
+                    connectionStates[info.ConnectionId] = connState;
+                }
+                else
+                {
+                    connectionStates.Add(info.ConnectionId, connState);
+                }
             }
 
             return TRACKED_RESPONSE_MARKER;
@@ -517,9 +517,9 @@ namespace Apache.NMS.ActiveMQ.State
                     ConnectionId connectionId = producerId.ParentId.ParentId;
                     if(connectionId != null)
                     {
-                                               ConnectionState cs = null;
+                        ConnectionState cs = null;
 
-                                               
if(connectionStates.TryGetValue(connectionId, out cs))
+                        if(connectionStates.TryGetValue(connectionId, out cs))
                         {
                             TransactionState transactionState = 
cs[send.TransactionId];
                             if(transactionState != null)
@@ -552,9 +552,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId;
                 if(connectionId != null)
                 {
-                                       ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-                                       
if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = 
cs[ack.TransactionId];
                         if(transactionState != null)
@@ -575,9 +575,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-                                       ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-                                       
if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         cs.AddTransactionState(info.TransactionId);
                         TransactionState state = cs[info.TransactionId];
@@ -596,9 +596,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-                                       ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-                                       
if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = 
cs[info.TransactionId];
                         if(transactionState != null)
@@ -619,9 +619,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-                                       ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-                                       
if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = 
cs[info.TransactionId];
                         if(transactionState != null)
@@ -642,9 +642,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-                                       ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-                                       
if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = 
cs[info.TransactionId];
                         if(transactionState != null)
@@ -665,9 +665,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-                                       ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-                                       
if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = 
cs[info.TransactionId];
                         if(transactionState != null)
@@ -688,9 +688,9 @@ namespace Apache.NMS.ActiveMQ.State
                 ConnectionId connectionId = info.ConnectionId;
                 if(connectionId != null)
                 {
-                                       ConnectionState cs = null;
+                    ConnectionState cs = null;
 
-                                       
if(connectionStates.TryGetValue(connectionId, out cs))
+                    if(connectionStates.TryGetValue(connectionId, out cs))
                     {
                         TransactionState transactionState = 
cs[info.TransactionId];
                         if(transactionState != null)
@@ -760,57 +760,61 @@ namespace Apache.NMS.ActiveMQ.State
         public int MaxCacheSize
         {
             get { return maxCacheSize; }
-            set 
-                       { 
-                               this.maxCacheSize = value; 
-                               this.messageCache.MaxCacheSize = maxCacheSize;
-                       }
+            set
+            {
+                this.maxCacheSize = value;
+                this.messageCache.MaxCacheSize = maxCacheSize;
+            }
         }
 
         public void ConnectionInterruptProcessingComplete(ITransport 
transport, ConnectionId connectionId)
         {
-                       ConnectionState connectionState = null;
+            ConnectionState connectionState = null;
 
-                       if(connectionStates.TryGetValue(connectionId, out 
connectionState))
+            if(connectionStates.TryGetValue(connectionId, out connectionState))
             {
                 connectionState.ConnectionInterruptProcessingComplete = true;
 
-                               lock(((ICollection) 
connectionState.RecoveringPullConsumers).SyncRoot)
-                               {
-                                       foreach(KeyValuePair<ConsumerId, 
ConsumerInfo> entry in connectionState.RecoveringPullConsumers)
-                                       {
-                                               ConsumerControl control = new 
ConsumerControl();
-                                               control.ConsumerId = entry.Key;
-                                               control.Prefetch = 
entry.Value.PrefetchSize;
-                                               control.Destination = 
entry.Value.Destination;
-                                               try
-                                               {
-                                                       
if(Tracer.IsDebugEnabled)
-                                                       {
-                                                               
Tracer.Debug("restored recovering consumer: " + control.ConsumerId +
-                                                                               
         " with: " + control.Prefetch);
-                                                       }
-                                                       
transport.Oneway(control);
-                                               }
-                                               catch(Exception ex)
-                                               {
-                                                       
if(Tracer.IsDebugEnabled)
-                                                       {
-                                                               
Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId +
-                                                                               
         " with: " + control.Prefetch + "Error: " + ex.Message);
-                                                       }
-                                               }
-                                       }
-                                       
connectionState.RecoveringPullConsumers.Clear();
-                               }
+                Dictionary<ConsumerId, ConsumerInfo> 
consumersToRestorePrefetchOn;
+
+                lock(((ICollection) 
connectionState.RecoveringPullConsumers).SyncRoot)
+                {
+                    consumersToRestorePrefetchOn = new Dictionary<ConsumerId, 
ConsumerInfo>(connectionState.RecoveringPullConsumers);
+                    connectionState.RecoveringPullConsumers.Clear();
+                }
+
+                foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in 
consumersToRestorePrefetchOn)
+                {
+                    ConsumerControl control = new ConsumerControl();
+                    control.ConsumerId = entry.Key;
+                    control.Prefetch = entry.Value.PrefetchSize;
+                    control.Destination = entry.Value.Destination;
+                    try
+                    {
+                        if(Tracer.IsDebugEnabled)
+                        {
+                            Tracer.Debug("restored recovering consumer: " + 
control.ConsumerId +
+                                         " with: " + control.Prefetch);
+                        }
+                        transport.Oneway(control);
+                    }
+                    catch(Exception ex)
+                    {
+                        if(Tracer.IsDebugEnabled)
+                        {
+                            Tracer.Debug("Failed to submit control for 
consumer: " + control.ConsumerId +
+                                         " with: " + control.Prefetch + 
"Error: " + ex.Message);
+                        }
+                    }
+                }
             }
         }
 
         public void TransportInterrupted(ConnectionId id)
         {
-                       ConnectionState connection = null;
+            ConnectionState connection = null;
 
-                       if(connectionStates.TryGetValue(id, out connection))
+            if(connectionStates.TryGetValue(id, out connection))
             {
                 connection.ConnectionInterruptProcessingComplete = false;
             }


Reply via email to