Author: jgomes
Date: Tue Aug 10 23:53:33 2010
New Revision: 984268

URL: http://svn.apache.org/viewvc?rev=984268&view=rev
Log:
Added support for processing expired messages that were received from the 
broker.
Fixes [AMQNET-268]. (See https://issues.apache.org/activemq/browse/AMQNET-268)

Added:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs
      - copied, changed from r982821, 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs
Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj
    activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=984268&r1=984267&r2=984268&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 Tue Aug 10 23:53:33 2010
@@ -97,9 +97,14 @@ namespace Apache.NMS.ActiveMQ
             // If the destination contained a URI query, then use it to set 
public properties
             // on the ConsumerInfo
             if(destination.Options != null)
-            {                          
+            {
+                               // Get options prefixed with "consumer.*"
                                StringDictionary options = 
URISupport.GetProperties(destination.Options, "consumer.");
+                               // Extract out custom extension options 
"consumer.nms.*"
+                               StringDictionary customConsumerOptions = 
URISupport.ExtractProperties(options, "nms.");
+
                                URISupport.SetProperties(this.info, options);
+                               URISupport.SetProperties(this, 
customConsumerOptions, "nms.");
             }
                }
 
@@ -145,7 +150,15 @@ namespace Apache.NMS.ActiveMQ
         public long UnconsumedMessageCount
         {
             get { return this.unconsumedMessages.Count; }
-        }   
+        }
+
+               // Custom Options
+               private bool ignoreExpiration = false;
+               public bool IgnoreExpiration
+               {
+                       get { return ignoreExpiration; }
+                       set { ignoreExpiration = value; }
+               }
 
                #endregion
 
@@ -335,7 +348,7 @@ namespace Apache.NMS.ActiveMQ
                                this.session.Connection.Oneway(removeCommand);
                                this.session = null;
 
-                Tracer.Debug("Consumer instnace Closed.");
+                Tracer.Debug("Consumer instance Closed.");
             }
                }
 
@@ -541,7 +554,7 @@ namespace Apache.NMS.ActiveMQ
 
                                                        try
                                                        {
-                                                               bool expired = 
message.IsExpired();
+                                                               bool expired = 
(!IgnoreExpiration && message.IsExpired());
 
                                                                if(!expired)
                                                                {
@@ -672,7 +685,7 @@ namespace Apache.NMS.ActiveMQ
                                {
                                        return null;
                                }
-                               else if(dispatch.Message.IsExpired())
+                               else if(!IgnoreExpiration && 
dispatch.Message.IsExpired())
                                {
                                        Tracer.DebugFormat("{0} received 
expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
 
@@ -727,7 +740,7 @@ namespace Apache.NMS.ActiveMQ
                                return;
                        }
 
-                       if(expired == true)
+                       if(expired)
                        {
                                lock(this.dispatchedMessages)
                                {

Copied: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs
 (from r982821, 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs)
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs?p2=activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs&p1=activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs&r1=982821&r2=984268&rev=984268&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs
 Tue Aug 10 23:53:33 2010
@@ -18,172 +18,116 @@
 using System.Threading;
 using Apache.NMS.Test;
 using NUnit.Framework;
+using Apache.NMS.ActiveMQ.Commands;
+using System;
+using Apache.NMS.Util;
 
 namespace Apache.NMS.ActiveMQ.Test
 {
-       [TestFixture]
-       public class VirtualTopicTest : NMSTestSupport
+       public enum ExpirationOptions
        {
-               protected static string DESTINATION_NAME = "TestDestination";
-               protected static string PRODUCER_DESTINATION_NAME = 
"VirtualTopic." + DESTINATION_NAME;
-               protected static string CONSUMER_A_DESTINATION_NAME = 
"Consumer.A." + PRODUCER_DESTINATION_NAME;
-               protected static string CONSUMER_B_DESTINATION_NAME = 
"Consumer.B." + PRODUCER_DESTINATION_NAME;
-               protected static string TEST_CLIENT_ID = 
"VirtualTopicTestClientId";
+               DEFAULT,
+               IGNORE,
+               DO_NOT_IGNORE
+       }
 
-               protected const int totalMsgs = 5;
+       [TestFixture]
+       public class MessageConsumerTest : NMSTestSupport
+       {
+               protected static string DESTINATION_NAME = 
"queue://TestDestination";
+               protected static string TEST_CLIENT_ID = 
"MessageConsumerTestClientId";
 
                [Test]
-               public void SendReceiveVirtualTopicMessage(
+               public void TestReceiveIgnoreExpirationMessage(
                        [Values(AcknowledgementMode.AutoAcknowledge, 
AcknowledgementMode.ClientAcknowledge,
                                AcknowledgementMode.DupsOkAcknowledge, 
AcknowledgementMode.Transactional)]
                        AcknowledgementMode ackMode,
                        [Values(MsgDeliveryMode.NonPersistent, 
MsgDeliveryMode.Persistent)]
-                       MsgDeliveryMode deliveryMode)
+                       MsgDeliveryMode deliveryMode,
+                       [Values(ExpirationOptions.DEFAULT, 
ExpirationOptions.IGNORE, ExpirationOptions.DO_NOT_IGNORE)]
+                       ExpirationOptions expirationOption)
                {
                        using(IConnection connection = 
CreateConnection(TEST_CLIENT_ID))
                        {
                                connection.Start();
-                               using(ISession session = 
connection.CreateSession(ackMode))
+                               using(Session session = 
connection.CreateSession(ackMode) as Session)
                                {
-                                       using(IMessageConsumer consumerA = 
session.CreateConsumer(session.GetQueue(CONSUMER_A_DESTINATION_NAME)))
-                                       using(IMessageConsumer consumerB = 
session.CreateConsumer(session.GetQueue(CONSUMER_B_DESTINATION_NAME)))
-                                       using(IMessageProducer producer = 
session.CreateProducer(session.GetTopic(PRODUCER_DESTINATION_NAME)))
+                                       string destinationName = 
DESTINATION_NAME;
+
+                                       if(ExpirationOptions.IGNORE == 
expirationOption)
+                                       {
+                                               destinationName += 
"?consumer.nms.ignoreExpiration=true";
+                                       }
+                                       else if(ExpirationOptions.DO_NOT_IGNORE 
== expirationOption)
                                        {
-                                               producer.RequestTimeout = 
receiveTimeout;
-                                               producer.DeliveryMode = 
deliveryMode;
+                                               destinationName += 
"?consumer.nms.ignoreExpiration=false";
+                                       }
 
-                                               for(int index = 0; index < 
totalMsgs; index++)
-                                               {
-                                                       string msgText = 
"Message #" + index;
-                                                       Tracer.Info("Sending: " 
+ msgText);
-                                                       
producer.Send(session.CreateTextMessage(msgText));
-                                               }
+                                       try
+                                       {
+                                               IDestination destination = 
SessionUtil.GetDestination(session, destinationName);
 
-                                               
if(AcknowledgementMode.Transactional == ackMode)
+                                               using(IMessageConsumer consumer 
= session.CreateConsumer(destination))
+                                               using(IMessageProducer producer 
= session.CreateProducer(destination))
                                                {
-                                                       session.Commit();
-                                               }
+                                                       producer.RequestTimeout 
= receiveTimeout;
+                                                       producer.DeliveryMode = 
deliveryMode;
 
-                                               for(int index = 0; index < 
totalMsgs; index++)
-                                               {
-                                                       string msgText = 
"Message #" + index;
-                                                       ITextMessage messageA = 
consumerA.Receive(receiveTimeout) as ITextMessage;
-                                                       
Assert.IsNotNull(messageA, "Did not receive message for consumer A.");
-                                                       messageA.Acknowledge();
-                                                       Tracer.Info("Received 
A: " + msgText);
-
-                                                       ITextMessage messageB = 
consumerB.Receive(receiveTimeout) as ITextMessage;
-                                                       
Assert.IsNotNull(messageB, "Did not receive message for consumer B.");
-                                                       messageB.Acknowledge();
-                                                       Tracer.Info("Received 
B: " + msgText);
+                                                       string msgText = 
"ExpiredMessage:" + Guid.NewGuid().ToString();
 
-                                                       
Assert.AreEqual(msgText, messageA.Text, "Message text A does not match.");
-                                                       
Assert.AreEqual(msgText, messageB.Text, "Message text B does not match.");
-                                               }
+                                                       ActiveMQTextMessage msg 
= session.CreateTextMessage(msgText) as ActiveMQTextMessage;
 
-                                               
if(AcknowledgementMode.Transactional == ackMode)
-                                               {
-                                                       session.Commit();
-                                               }
-                                       }
+                                                       // Give it two seconds 
to live.
+                                                       msg.NMSTimeToLive = 
TimeSpan.FromMilliseconds(2000);
 
-                    // Give the Broker some time to remove the subscriptions.
-                    Thread.Sleep(2000);
+                                                       producer.Send(msg);
 
-                    try
-                    {
-                        ((Session) 
session).DeleteDestination(session.GetQueue(CONSUMER_A_DESTINATION_NAME));
-                        ((Session) 
session).DeleteDestination(session.GetQueue(CONSUMER_B_DESTINATION_NAME));
-                    }
-                    catch
-                    {
-                    }
-                               }
-                       }
-               }
+                                                       
if(AcknowledgementMode.Transactional == ackMode)
+                                                       {
+                                                               
session.Commit();
+                                                       }
 
-               protected int receivedA;
-               protected int receivedB;
+                                                       // Wait for four 
seconds before processing it.  The broker will have sent it to our local
+                                                       // client dispatch 
queue, but we won't attempt to process the message until it has had
+                                                       // a chance to expire 
within our internal queue system.
+                                                       Thread.Sleep(4000);
 
-               [Test]
-               // Do not use listeners with transactional processing.
-               public void AsyncSendReceiveVirtualTopicMessage(
-                       [Values(AcknowledgementMode.AutoAcknowledge, 
AcknowledgementMode.ClientAcknowledge, AcknowledgementMode.DupsOkAcknowledge)]
-                       AcknowledgementMode ackMode,
-                       [Values(MsgDeliveryMode.NonPersistent, 
MsgDeliveryMode.Persistent)]
-                       MsgDeliveryMode deliveryMode)
-               {
-                       receivedA = 0;
-                       receivedB = 0;
+                                                       ActiveMQTextMessage 
rcvMsg = consumer.ReceiveNoWait() as ActiveMQTextMessage;
 
-                       using(IConnection connection = 
CreateConnection(TEST_CLIENT_ID))
-                       {
-                               connection.Start();
-                               using(ISession session = 
connection.CreateSession(ackMode))
-                               {
-                                       using(IMessageConsumer consumerA = 
session.CreateConsumer(session.GetQueue(CONSUMER_A_DESTINATION_NAME)))
-                                       using(IMessageConsumer consumerB = 
session.CreateConsumer(session.GetQueue(CONSUMER_B_DESTINATION_NAME)))
-                                       using(IMessageProducer producer = 
session.CreateProducer(session.GetTopic(PRODUCER_DESTINATION_NAME)))
-                                       {
-                                               producer.RequestTimeout = 
receiveTimeout;
-                                               producer.DeliveryMode = 
deliveryMode;
+                                                       
if(ExpirationOptions.IGNORE == expirationOption)
+                                                       {
+                                                               
Assert.IsNotNull(rcvMsg, "Did not receive expired message.");
+                                                               
rcvMsg.Acknowledge();
 
-                                               consumerA.Listener += 
MessageListenerA;
-                                               consumerB.Listener += 
MessageListenerB;
+                                                               
Assert.AreEqual(msgText, rcvMsg.Text, "Message text does not match.");
+                                                               
Assert.IsTrue(rcvMsg.IsExpired());
 
-                                               for(int index = 0; index < 
totalMsgs; index++)
+                                                               
if(AcknowledgementMode.Transactional == ackMode)
+                                                               {
+                                                                       
session.Commit();
+                                                               }
+                                                       }
+                                                       else
+                                                       {
+                                                               // Should not 
receive a message.
+                                                               
Assert.IsNull(rcvMsg, "Received an expired message!");
+                                                       }
+                                               }
+                                       }
+                                       finally
+                                       {
+                                               try
                                                {
-                                                       string msgText = 
"Message #" + index;
-                                                       Tracer.Info("Sending: " 
+ msgText);
-                                                       
producer.Send(session.CreateTextMessage(msgText));
+                                                       // Give the Broker some 
time to remove the subscriptions.
+                                                       Thread.Sleep(2000);
+                                                       
SessionUtil.DeleteDestination(session, destinationName);
                                                }
-
-                                               int waitCount = 0;
-                                               while(receivedA < totalMsgs && 
receivedB < totalMsgs)
+                                               catch
                                                {
-                                                       if(waitCount++ > 50)
-                                                       {
-                                                               
Assert.Fail("Timed out waiting for message consumers.  A = " + receivedA + ", B 
= " + receivedB);
-                                                       }
-
-                                                       Tracer.Info("Waiting... 
Received A = " + receivedA + ", Received B = " + receivedB);
-                                                       Thread.Sleep(250);
                                                }
                                        }
-                    
-                    // Give the Broker some time to remove the subscriptions.
-                    Thread.Sleep(2000);
-
-                    try
-                    {
-                        ((Session) 
session).DeleteDestination(session.GetQueue(CONSUMER_A_DESTINATION_NAME));
-                        ((Session) 
session).DeleteDestination(session.GetQueue(CONSUMER_B_DESTINATION_NAME));
-                                   }
-                    catch
-                    {
-                    }
-                }
+                               }
                        }
                }
-
-               private void MessageListenerA(IMessage message)
-               {
-                       message.Acknowledge();
-                       ITextMessage messageA = message as ITextMessage;
-                       string msgText = "Message #" + receivedA;
-                       Assert.AreEqual(msgText, messageA.Text, "Message text A 
does not match.");
-                       Tracer.Info("Received Listener A: " + msgText);
-                       receivedA++;
-               }
-
-               private void MessageListenerB(IMessage message)
-               {
-                       message.Acknowledge();
-                       ITextMessage messageB = message as ITextMessage;
-                       string msgText = "Message #" + receivedB;
-                       Assert.AreEqual(msgText, messageB.Text, "Message text B 
does not match.");
-                       Tracer.Info("Received Listener B: " + msgText);
-                       receivedB++;
-               }
        }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj?rev=984268&r1=984267&r2=984268&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj 
Tue Aug 10 23:53:33 2010
@@ -2,7 +2,7 @@
   <PropertyGroup>
     <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
     <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
-    <ProductVersion>9.0.21022</ProductVersion>
+    <ProductVersion>9.0.30729</ProductVersion>
     <SchemaVersion>2.0</SchemaVersion>
     <ProjectGuid>{EB943C69-2C9B-45E7-B95B-FB916E7057ED}</ProjectGuid>
     <OutputType>Library</OutputType>
@@ -74,6 +74,7 @@
     <Compile Include="src\test\csharp\CommonAssemblyInfo.cs" />
     <Compile Include="src\test\csharp\ConnectionFactoryTest.cs" />
     <Compile Include="src\test\csharp\ConnectionMetaDataTest.cs" />
+    <Compile Include="src\test\csharp\MessageConsumerTest.cs" />
     <Compile Include="src\test\csharp\ExclusiveConsumerTest.cs" />
     <Compile Include="src\test\csharp\IndividualAckTest.cs" />
     <Compile Include="src\test\csharp\MessageListenerRedeliveryTest.cs" />

Modified: 
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs?rev=984268&r1=984267&r2=984268&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs 
Tue Aug 10 23:53:33 2010
@@ -233,7 +233,7 @@ namespace Apache.NMS.Util
 
             foreach(string key in props.Keys)
             {
-                if(key.StartsWith(prefix))
+                if(key.StartsWith(prefix, true, CultureInfo.InvariantCulture))
                 {
                     String value = props[key];
                     result[key] = value;


Reply via email to