Author: tabish Date: Mon Aug 11 17:57:24 2014 New Revision: 1617336 URL: http://svn.apache.org/r1617336 Log: Apply fix for potential deadlock on restore with active pull requests. Fixes
[AMQNET-487]. (See https://issues.apache.org/jira/browse/AMQNET-487) Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/ (props changed) activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/State/ConnectionStateTracker.cs Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/ ------------------------------------------------------------------------------ Merged /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk:r1617335 Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/State/ConnectionStateTracker.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/State/ConnectionStateTracker.cs?rev=1617336&r1=1617335&r2=1617336&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/State/ConnectionStateTracker.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.6.x/src/main/csharp/State/ConnectionStateTracker.cs Mon Aug 11 17:57:24 2014 @@ -32,8 +32,8 @@ namespace Apache.NMS.ActiveMQ.State { private static readonly Tracked TRACKED_RESPONSE_MARKER = new Tracked(null); - protected readonly Dictionary<ConnectionId, ConnectionState> connectionStates = - new Dictionary<ConnectionId, ConnectionState>(); + protected readonly Dictionary<ConnectionId, ConnectionState> connectionStates = + new Dictionary<ConnectionId, ConnectionState>(); private bool isTrackTransactions; private bool isTrackTransactionProducers = true; @@ -60,10 +60,10 @@ namespace Apache.NMS.ActiveMQ.State { ConnectionState cs; - if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs)) - { - cs.RemoveTransactionState(info.TransactionId); - } + if(cst.connectionStates.TryGetValue(info.ConnectionId, out cs)) + { + cs.RemoveTransactionState(info.TransactionId); + } } } @@ -148,8 +148,8 @@ namespace Apache.NMS.ActiveMQ.State { if (Tracer.IsDebugEnabled) { - Tracer.Debug("rolling back potentially completed tx: " + - transactionState.Id); + Tracer.Debug("rolling back potentially completed tx: " + + transactionState.Id); } toRollback.Add(transactionInfo); continue; @@ -167,7 +167,7 @@ namespace Apache.NMS.ActiveMQ.State } transport.Oneway(producerState.Info); } - + foreach (Command command in transactionState.Commands) { if (Tracer.IsDebugEnabled) @@ -176,7 +176,7 @@ namespace Apache.NMS.ActiveMQ.State } transport.Oneway(command); } - + foreach (ProducerState producerState in transactionState.ProducerStates) { if (Tracer.IsDebugEnabled) @@ -239,15 +239,15 @@ namespace Apache.NMS.ActiveMQ.State // Restore the session's consumers but possibly in pull only (prefetch 0 state) till // recovery completes. - ConnectionState connectionState = null; - bool connectionInterruptionProcessingComplete = false; + ConnectionState connectionState = null; + bool connectionInterruptionProcessingComplete = false; - if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState)) - { - connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete; - } - - // Restore the session's consumers + if(connectionStates.TryGetValue(sessionState.Info.SessionId.ParentId, out connectionState)) + { + connectionInterruptionProcessingComplete = connectionState.ConnectionInterruptProcessingComplete; + } + + // Restore the session's consumers foreach(ConsumerState consumerState in sessionState.ConsumerStates) { ConsumerInfo infoToSend = consumerState.Info; @@ -255,13 +255,13 @@ namespace Apache.NMS.ActiveMQ.State if(!connectionInterruptionProcessingComplete && infoToSend.PrefetchSize > 0 && transport.WireFormat.Version > 5) { infoToSend = consumerState.Info.Clone() as ConsumerInfo; - lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot) - { - if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId)) - { - connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info); - } - } + lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot) + { + if(!connectionState.RecoveringPullConsumers.ContainsKey(infoToSend.ConsumerId)) + { + connectionState.RecoveringPullConsumers.Add(infoToSend.ConsumerId, consumerState.Info); + } + } infoToSend.PrefetchSize = 0; if(Tracer.IsDebugEnabled) { @@ -316,10 +316,10 @@ namespace Apache.NMS.ActiveMQ.State { ConnectionState cs; - if(connectionStates.TryGetValue(info.ConnectionId, out cs)) - { - cs.AddTempDestination(info); - } + if(connectionStates.TryGetValue(info.ConnectionId, out cs)) + { + cs.AddTempDestination(info); + } } return TRACKED_RESPONSE_MARKER; } @@ -329,8 +329,8 @@ namespace Apache.NMS.ActiveMQ.State if(info != null && info.Destination.IsTemporary) { ConnectionState cs; - if(connectionStates.TryGetValue(info.ConnectionId, out cs)) - { + if(connectionStates.TryGetValue(info.ConnectionId, out cs)) + { cs.RemoveTempDestination(info.Destination); } } @@ -348,8 +348,8 @@ namespace Apache.NMS.ActiveMQ.State if(connectionId != null) { ConnectionState cs; - - if(connectionStates.TryGetValue(connectionId, out cs)) + + if(connectionStates.TryGetValue(connectionId, out cs)) { SessionState ss = cs[sessionId]; if(ss != null) @@ -373,9 +373,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = sessionId.ParentId; if(connectionId != null) { - ConnectionState cs = null; - - if(connectionStates.TryGetValue(connectionId, out cs)) + ConnectionState cs = null; + + if(connectionStates.TryGetValue(connectionId, out cs)) { SessionState ss = cs[sessionId]; if(ss != null) @@ -399,9 +399,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = sessionId.ParentId; if(connectionId != null) { - ConnectionState cs = null; + ConnectionState cs = null; - if(connectionStates.TryGetValue(connectionId, out cs)) + if(connectionStates.TryGetValue(connectionId, out cs)) { SessionState ss = cs[sessionId]; if(ss != null) @@ -425,9 +425,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = sessionId.ParentId; if(connectionId != null) { - ConnectionState cs = null; + ConnectionState cs = null; - if(connectionStates.TryGetValue(connectionId, out cs)) + if(connectionStates.TryGetValue(connectionId, out cs)) { SessionState ss = cs[sessionId]; if(ss != null) @@ -435,7 +435,7 @@ namespace Apache.NMS.ActiveMQ.State ss.RemoveConsumer(id); } - cs.RecoveringPullConsumers.Remove(id); + cs.RecoveringPullConsumers.Remove(id); } } } @@ -450,9 +450,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = info.SessionId.ParentId; if(connectionId != null) { - ConnectionState cs = null; + ConnectionState cs = null; - if(connectionStates.TryGetValue(connectionId, out cs)) + if(connectionStates.TryGetValue(connectionId, out cs)) { cs.AddSession(info); } @@ -468,9 +468,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = id.ParentId; if(connectionId != null) { - ConnectionState cs = null; + ConnectionState cs = null; - if(connectionStates.TryGetValue(connectionId, out cs)) + if(connectionStates.TryGetValue(connectionId, out cs)) { cs.RemoveSession(id); } @@ -483,16 +483,16 @@ namespace Apache.NMS.ActiveMQ.State { if(info != null) { - ConnectionState connState = new ConnectionState(info); + ConnectionState connState = new ConnectionState(info); - if(connectionStates.ContainsKey(info.ConnectionId)) - { - connectionStates[info.ConnectionId] = connState; - } - else - { - connectionStates.Add(info.ConnectionId, connState); - } + if(connectionStates.ContainsKey(info.ConnectionId)) + { + connectionStates[info.ConnectionId] = connState; + } + else + { + connectionStates.Add(info.ConnectionId, connState); + } } return TRACKED_RESPONSE_MARKER; @@ -517,9 +517,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = producerId.ParentId.ParentId; if(connectionId != null) { - ConnectionState cs = null; + ConnectionState cs = null; - if(connectionStates.TryGetValue(connectionId, out cs)) + if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[send.TransactionId]; if(transactionState != null) @@ -552,9 +552,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = ack.ConsumerId.ParentId.ParentId; if(connectionId != null) { - ConnectionState cs = null; + ConnectionState cs = null; - if(connectionStates.TryGetValue(connectionId, out cs)) + if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[ack.TransactionId]; if(transactionState != null) @@ -575,9 +575,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = info.ConnectionId; if(connectionId != null) { - ConnectionState cs = null; + ConnectionState cs = null; - if(connectionStates.TryGetValue(connectionId, out cs)) + if(connectionStates.TryGetValue(connectionId, out cs)) { cs.AddTransactionState(info.TransactionId); TransactionState state = cs[info.TransactionId]; @@ -596,9 +596,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = info.ConnectionId; if(connectionId != null) { - ConnectionState cs = null; + ConnectionState cs = null; - if(connectionStates.TryGetValue(connectionId, out cs)) + if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[info.TransactionId]; if(transactionState != null) @@ -619,9 +619,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = info.ConnectionId; if(connectionId != null) { - ConnectionState cs = null; + ConnectionState cs = null; - if(connectionStates.TryGetValue(connectionId, out cs)) + if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[info.TransactionId]; if(transactionState != null) @@ -642,9 +642,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = info.ConnectionId; if(connectionId != null) { - ConnectionState cs = null; + ConnectionState cs = null; - if(connectionStates.TryGetValue(connectionId, out cs)) + if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[info.TransactionId]; if(transactionState != null) @@ -665,9 +665,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = info.ConnectionId; if(connectionId != null) { - ConnectionState cs = null; + ConnectionState cs = null; - if(connectionStates.TryGetValue(connectionId, out cs)) + if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[info.TransactionId]; if(transactionState != null) @@ -688,9 +688,9 @@ namespace Apache.NMS.ActiveMQ.State ConnectionId connectionId = info.ConnectionId; if(connectionId != null) { - ConnectionState cs = null; + ConnectionState cs = null; - if(connectionStates.TryGetValue(connectionId, out cs)) + if(connectionStates.TryGetValue(connectionId, out cs)) { TransactionState transactionState = cs[info.TransactionId]; if(transactionState != null) @@ -760,57 +760,61 @@ namespace Apache.NMS.ActiveMQ.State public int MaxCacheSize { get { return maxCacheSize; } - set - { - this.maxCacheSize = value; - this.messageCache.MaxCacheSize = maxCacheSize; - } + set + { + this.maxCacheSize = value; + this.messageCache.MaxCacheSize = maxCacheSize; + } } public void ConnectionInterruptProcessingComplete(ITransport transport, ConnectionId connectionId) { - ConnectionState connectionState = null; + ConnectionState connectionState = null; - if(connectionStates.TryGetValue(connectionId, out connectionState)) + if(connectionStates.TryGetValue(connectionId, out connectionState)) { connectionState.ConnectionInterruptProcessingComplete = true; - lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot) - { - foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in connectionState.RecoveringPullConsumers) - { - ConsumerControl control = new ConsumerControl(); - control.ConsumerId = entry.Key; - control.Prefetch = entry.Value.PrefetchSize; - control.Destination = entry.Value.Destination; - try - { - if(Tracer.IsDebugEnabled) - { - Tracer.Debug("restored recovering consumer: " + control.ConsumerId + - " with: " + control.Prefetch); - } - transport.Oneway(control); - } - catch(Exception ex) - { - if(Tracer.IsDebugEnabled) - { - Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId + - " with: " + control.Prefetch + "Error: " + ex.Message); - } - } - } - connectionState.RecoveringPullConsumers.Clear(); - } + Dictionary<ConsumerId, ConsumerInfo> consumersToRestorePrefetchOn; + + lock(((ICollection) connectionState.RecoveringPullConsumers).SyncRoot) + { + consumersToRestorePrefetchOn = new Dictionary<ConsumerId, ConsumerInfo>(connectionState.RecoveringPullConsumers); + connectionState.RecoveringPullConsumers.Clear(); + } + + foreach(KeyValuePair<ConsumerId, ConsumerInfo> entry in consumersToRestorePrefetchOn) + { + ConsumerControl control = new ConsumerControl(); + control.ConsumerId = entry.Key; + control.Prefetch = entry.Value.PrefetchSize; + control.Destination = entry.Value.Destination; + try + { + if(Tracer.IsDebugEnabled) + { + Tracer.Debug("restored recovering consumer: " + control.ConsumerId + + " with: " + control.Prefetch); + } + transport.Oneway(control); + } + catch(Exception ex) + { + if(Tracer.IsDebugEnabled) + { + Tracer.Debug("Failed to submit control for consumer: " + control.ConsumerId + + " with: " + control.Prefetch + "Error: " + ex.Message); + } + } + } } } public void TransportInterrupted(ConnectionId id) { - ConnectionState connection = null; + ConnectionState connection = null; - if(connectionStates.TryGetValue(id, out connection)) + if(connectionStates.TryGetValue(id, out connection)) { connection.ConnectionInterruptProcessingComplete = false; }
