Author: jgomes
Date: Fri Sep 16 22:54:08 2011
New Revision: 1171844

URL: http://svn.apache.org/viewvc?rev=1171844&view=rev
Log:
Merged revision(s) 1171843 from 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:
Refactor to minimize the amount of time that a lock is kept.  This will also 
stop the potential of a deadlock from occurring while dispatching a message.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/   (props changed)
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs

Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 16 22:54:08 2011
@@ -1,3 +1,3 @@
-/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.0.0:692591,693525
 /activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0:788230,788233,790183

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1171844&r1=1171843&r2=1171844&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 Fri Sep 16 22:54:08 2011
@@ -596,6 +596,7 @@ namespace Apache.NMS.ActiveMQ
                public virtual void Dispatch(MessageDispatch dispatch)
                {
                        MessageListener listener = this.listener;
+                       bool dispatchMessage = false;
 
                        try
                        {
@@ -623,47 +624,52 @@ namespace Apache.NMS.ActiveMQ
                                        {
                                                if(listener != null && 
this.unconsumedMessages.Running)
                                                {
-                                                       ActiveMQMessage message 
= CreateActiveMQMessage(dispatch);
-
-                                                       
this.BeforeMessageIsConsumed(dispatch);
+                                                       dispatchMessage = true;
+                                               }
+                                               else
+                                               {
+                                                       
this.unconsumedMessages.Enqueue(dispatch);
+                                               }
+                                       }
+                               }
 
-                                                       try
-                                                       {
-                                                               bool expired = 
(!IgnoreExpiration && message.IsExpired());
+                               if(dispatchMessage)
+                               {
+                                       ActiveMQMessage message = 
CreateActiveMQMessage(dispatch);
 
-                                                               if(!expired)
-                                                               {
-                                                                       
listener(message);
-                                                               }
+                                       this.BeforeMessageIsConsumed(dispatch);
 
-                                                               
this.AfterMessageIsConsumed(dispatch, expired);
-                                                       }
-                                                       catch(Exception e)
-                                                       {
-                                                               
if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
-                                                               {
-                                                                       // 
Redeliver the message
-                                                               }
-                                                               else
-                                                               {
-                                                                       // 
Transacted or Client ack: Deliver the next message.
-                                                                       
this.AfterMessageIsConsumed(dispatch, false);
-                                                               }
+                                       try
+                                       {
+                                               bool expired = 
(!IgnoreExpiration && message.IsExpired());
 
-                                                               
Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + 
e);
+                                               if(!expired)
+                                               {
+                                                       listener(message);
+                                               }
 
-                                                               // If aborted 
we stop the abort here and let normal processing resume.
-                                                               // This allows 
the session to shutdown normally and ack all messages
-                                                               // that have 
outstanding acks in this consumer.
-                                                               if( 
(Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == 
ThreadState.AbortRequested)
-                                                               {
-                                                                       
Thread.ResetAbort();
-                                                               }
-                                                       }
+                                               
this.AfterMessageIsConsumed(dispatch, expired);
+                                       }
+                                       catch(Exception e)
+                                       {
+                                               if(IsAutoAcknowledgeBatch || 
IsAutoAcknowledgeEach || IsIndividualAcknowledge)
+                                               {
+                                                       // Redeliver the message
                                                }
                                                else
                                                {
-                                                       
this.unconsumedMessages.Enqueue(dispatch);
+                                                       // Transacted or Client 
ack: Deliver the next message.
+                                                       
this.AfterMessageIsConsumed(dispatch, false);
+                                               }
+
+                                               
Tracer.Error(this.info.ConsumerId + " Exception while processing message: " + 
e);
+
+                                               // If aborted we stop the abort 
here and let normal processing resume.
+                                               // This allows the session to 
shutdown normally and ack all messages
+                                               // that have outstanding acks 
in this consumer.
+                                               
if((Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) == 
ThreadState.AbortRequested)
+                                               {
+                                                       Thread.ResetAbort();
                                                }
                                        }
                                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs?rev=1171844&r1=1171843&r2=1171844&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
 Fri Sep 16 22:54:08 2011
@@ -51,10 +51,7 @@ namespace Apache.NMS.ActiveMQ.Util
         {
             get 
             {
-                lock(this.mutex)
-                {
-                    return this.closed; 
-                }
+                return this.closed; 
             }
             
             set 
@@ -70,10 +67,7 @@ namespace Apache.NMS.ActiveMQ.Util
         {
             get
             {
-                lock(this.mutex)
-                {
-                    return this.running;
-                }
+                return this.running;
             }
             
             set
@@ -89,10 +83,7 @@ namespace Apache.NMS.ActiveMQ.Util
         {
             get
             {
-                lock(mutex)
-                {
-                    return this.size == 0;
-                }
+                return this.size == 0;
             }
         }
 
@@ -100,10 +91,7 @@ namespace Apache.NMS.ActiveMQ.Util
         {
             get
             {
-                lock(mutex)
-                {
-                    return this.size;
-                }
+                return this.size;
             }
         }
 


Reply via email to