Author: jgomes
Date: Fri Sep 16 22:54:08 2011
New Revision: 1171844
URL: http://svn.apache.org/viewvc?rev=1171844&view=rev
Log:
Merged revision(s) 1171843 from
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:
Refactor to minimize the amount of time that a lock is kept. This will also
stop the potential of a deadlock from occurring while dispatching a message.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/ (props changed)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
Propchange: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Sep 16 22:54:08 2011
@@ -1,3 +1,3 @@
-/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081
+/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x:1082291,1135831,1137081,1171843
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.0.0:692591,693525
/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/tags/1.1.0:788230,788233,790183
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1171844&r1=1171843&r2=1171844&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Fri Sep 16 22:54:08 2011
@@ -596,6 +596,7 @@ namespace Apache.NMS.ActiveMQ
public virtual void Dispatch(MessageDispatch dispatch)
{
MessageListener listener = this.listener;
+ bool dispatchMessage = false;
try
{
@@ -623,47 +624,52 @@ namespace Apache.NMS.ActiveMQ
{
if(listener != null &&
this.unconsumedMessages.Running)
{
- ActiveMQMessage message
= CreateActiveMQMessage(dispatch);
-
-
this.BeforeMessageIsConsumed(dispatch);
+ dispatchMessage = true;
+ }
+ else
+ {
+
this.unconsumedMessages.Enqueue(dispatch);
+ }
+ }
+ }
- try
- {
- bool expired =
(!IgnoreExpiration && message.IsExpired());
+ if(dispatchMessage)
+ {
+ ActiveMQMessage message =
CreateActiveMQMessage(dispatch);
- if(!expired)
- {
-
listener(message);
- }
+ this.BeforeMessageIsConsumed(dispatch);
-
this.AfterMessageIsConsumed(dispatch, expired);
- }
- catch(Exception e)
- {
-
if(IsAutoAcknowledgeBatch || IsAutoAcknowledgeEach || IsIndividualAcknowledge)
- {
- //
Redeliver the message
- }
- else
- {
- //
Transacted or Client ack: Deliver the next message.
-
this.AfterMessageIsConsumed(dispatch, false);
- }
+ try
+ {
+ bool expired =
(!IgnoreExpiration && message.IsExpired());
-
Tracer.Error(this.info.ConsumerId + " Exception while processing message: " +
e);
+ if(!expired)
+ {
+ listener(message);
+ }
- // If aborted
we stop the abort here and let normal processing resume.
- // This allows
the session to shutdown normally and ack all messages
- // that have
outstanding acks in this consumer.
- if(
(Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) ==
ThreadState.AbortRequested)
- {
-
Thread.ResetAbort();
- }
- }
+
this.AfterMessageIsConsumed(dispatch, expired);
+ }
+ catch(Exception e)
+ {
+ if(IsAutoAcknowledgeBatch ||
IsAutoAcknowledgeEach || IsIndividualAcknowledge)
+ {
+ // Redeliver the message
}
else
{
-
this.unconsumedMessages.Enqueue(dispatch);
+ // Transacted or Client
ack: Deliver the next message.
+
this.AfterMessageIsConsumed(dispatch, false);
+ }
+
+
Tracer.Error(this.info.ConsumerId + " Exception while processing message: " +
e);
+
+ // If aborted we stop the abort
here and let normal processing resume.
+ // This allows the session to
shutdown normally and ack all messages
+ // that have outstanding acks
in this consumer.
+
if((Thread.CurrentThread.ThreadState & ThreadState.AbortRequested) ==
ThreadState.AbortRequested)
+ {
+ Thread.ResetAbort();
}
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs?rev=1171844&r1=1171843&r2=1171844&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
Fri Sep 16 22:54:08 2011
@@ -51,10 +51,7 @@ namespace Apache.NMS.ActiveMQ.Util
{
get
{
- lock(this.mutex)
- {
- return this.closed;
- }
+ return this.closed;
}
set
@@ -70,10 +67,7 @@ namespace Apache.NMS.ActiveMQ.Util
{
get
{
- lock(this.mutex)
- {
- return this.running;
- }
+ return this.running;
}
set
@@ -89,10 +83,7 @@ namespace Apache.NMS.ActiveMQ.Util
{
get
{
- lock(mutex)
- {
- return this.size == 0;
- }
+ return this.size == 0;
}
}
@@ -100,10 +91,7 @@ namespace Apache.NMS.ActiveMQ.Util
{
get
{
- lock(mutex)
- {
- return this.size;
- }
+ return this.size;
}
}