Author: jgomes
Date: Tue Aug 10 23:53:33 2010
New Revision: 984268
URL: http://svn.apache.org/viewvc?rev=984268&view=rev
Log:
Added support for processing expired messages that were received from the
broker.
Fixes [AMQNET-268]. (See https://issues.apache.org/activemq/browse/AMQNET-268)
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs
- copied, changed from r982821,
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=984268&r1=984267&r2=984268&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Tue Aug 10 23:53:33 2010
@@ -97,9 +97,14 @@ namespace Apache.NMS.ActiveMQ
// If the destination contained a URI query, then use it to set
public properties
// on the ConsumerInfo
if(destination.Options != null)
- {
+ {
+ // Get options prefixed with "consumer.*"
StringDictionary options =
URISupport.GetProperties(destination.Options, "consumer.");
+ // Extract out custom extension options
"consumer.nms.*"
+ StringDictionary customConsumerOptions =
URISupport.ExtractProperties(options, "nms.");
+
URISupport.SetProperties(this.info, options);
+ URISupport.SetProperties(this,
customConsumerOptions, "nms.");
}
}
@@ -145,7 +150,15 @@ namespace Apache.NMS.ActiveMQ
public long UnconsumedMessageCount
{
get { return this.unconsumedMessages.Count; }
- }
+ }
+
+ // Custom Options
+ private bool ignoreExpiration = false;
+ public bool IgnoreExpiration
+ {
+ get { return ignoreExpiration; }
+ set { ignoreExpiration = value; }
+ }
#endregion
@@ -335,7 +348,7 @@ namespace Apache.NMS.ActiveMQ
this.session.Connection.Oneway(removeCommand);
this.session = null;
- Tracer.Debug("Consumer instnace Closed.");
+ Tracer.Debug("Consumer instance Closed.");
}
}
@@ -541,7 +554,7 @@ namespace Apache.NMS.ActiveMQ
try
{
- bool expired =
message.IsExpired();
+ bool expired =
(!IgnoreExpiration && message.IsExpired());
if(!expired)
{
@@ -672,7 +685,7 @@ namespace Apache.NMS.ActiveMQ
{
return null;
}
- else if(dispatch.Message.IsExpired())
+ else if(!IgnoreExpiration &&
dispatch.Message.IsExpired())
{
Tracer.DebugFormat("{0} received
expired message: {1}", info.ConsumerId, dispatch.Message.MessageId);
@@ -727,7 +740,7 @@ namespace Apache.NMS.ActiveMQ
return;
}
- if(expired == true)
+ if(expired)
{
lock(this.dispatchedMessages)
{
Copied:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs
(from r982821,
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs)
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs?p2=activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs&p1=activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs&r1=982821&r2=984268&rev=984268&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/VirtualTopicTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/MessageConsumerTest.cs
Tue Aug 10 23:53:33 2010
@@ -18,172 +18,116 @@
using System.Threading;
using Apache.NMS.Test;
using NUnit.Framework;
+using Apache.NMS.ActiveMQ.Commands;
+using System;
+using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ.Test
{
- [TestFixture]
- public class VirtualTopicTest : NMSTestSupport
+ public enum ExpirationOptions
{
- protected static string DESTINATION_NAME = "TestDestination";
- protected static string PRODUCER_DESTINATION_NAME =
"VirtualTopic." + DESTINATION_NAME;
- protected static string CONSUMER_A_DESTINATION_NAME =
"Consumer.A." + PRODUCER_DESTINATION_NAME;
- protected static string CONSUMER_B_DESTINATION_NAME =
"Consumer.B." + PRODUCER_DESTINATION_NAME;
- protected static string TEST_CLIENT_ID =
"VirtualTopicTestClientId";
+ DEFAULT,
+ IGNORE,
+ DO_NOT_IGNORE
+ }
- protected const int totalMsgs = 5;
+ [TestFixture]
+ public class MessageConsumerTest : NMSTestSupport
+ {
+ protected static string DESTINATION_NAME =
"queue://TestDestination";
+ protected static string TEST_CLIENT_ID =
"MessageConsumerTestClientId";
[Test]
- public void SendReceiveVirtualTopicMessage(
+ public void TestReceiveIgnoreExpirationMessage(
[Values(AcknowledgementMode.AutoAcknowledge,
AcknowledgementMode.ClientAcknowledge,
AcknowledgementMode.DupsOkAcknowledge,
AcknowledgementMode.Transactional)]
AcknowledgementMode ackMode,
[Values(MsgDeliveryMode.NonPersistent,
MsgDeliveryMode.Persistent)]
- MsgDeliveryMode deliveryMode)
+ MsgDeliveryMode deliveryMode,
+ [Values(ExpirationOptions.DEFAULT,
ExpirationOptions.IGNORE, ExpirationOptions.DO_NOT_IGNORE)]
+ ExpirationOptions expirationOption)
{
using(IConnection connection =
CreateConnection(TEST_CLIENT_ID))
{
connection.Start();
- using(ISession session =
connection.CreateSession(ackMode))
+ using(Session session =
connection.CreateSession(ackMode) as Session)
{
- using(IMessageConsumer consumerA =
session.CreateConsumer(session.GetQueue(CONSUMER_A_DESTINATION_NAME)))
- using(IMessageConsumer consumerB =
session.CreateConsumer(session.GetQueue(CONSUMER_B_DESTINATION_NAME)))
- using(IMessageProducer producer =
session.CreateProducer(session.GetTopic(PRODUCER_DESTINATION_NAME)))
+ string destinationName =
DESTINATION_NAME;
+
+ if(ExpirationOptions.IGNORE ==
expirationOption)
+ {
+ destinationName +=
"?consumer.nms.ignoreExpiration=true";
+ }
+ else if(ExpirationOptions.DO_NOT_IGNORE
== expirationOption)
{
- producer.RequestTimeout =
receiveTimeout;
- producer.DeliveryMode =
deliveryMode;
+ destinationName +=
"?consumer.nms.ignoreExpiration=false";
+ }
- for(int index = 0; index <
totalMsgs; index++)
- {
- string msgText =
"Message #" + index;
- Tracer.Info("Sending: "
+ msgText);
-
producer.Send(session.CreateTextMessage(msgText));
- }
+ try
+ {
+ IDestination destination =
SessionUtil.GetDestination(session, destinationName);
-
if(AcknowledgementMode.Transactional == ackMode)
+ using(IMessageConsumer consumer
= session.CreateConsumer(destination))
+ using(IMessageProducer producer
= session.CreateProducer(destination))
{
- session.Commit();
- }
+ producer.RequestTimeout
= receiveTimeout;
+ producer.DeliveryMode =
deliveryMode;
- for(int index = 0; index <
totalMsgs; index++)
- {
- string msgText =
"Message #" + index;
- ITextMessage messageA =
consumerA.Receive(receiveTimeout) as ITextMessage;
-
Assert.IsNotNull(messageA, "Did not receive message for consumer A.");
- messageA.Acknowledge();
- Tracer.Info("Received
A: " + msgText);
-
- ITextMessage messageB =
consumerB.Receive(receiveTimeout) as ITextMessage;
-
Assert.IsNotNull(messageB, "Did not receive message for consumer B.");
- messageB.Acknowledge();
- Tracer.Info("Received
B: " + msgText);
+ string msgText =
"ExpiredMessage:" + Guid.NewGuid().ToString();
-
Assert.AreEqual(msgText, messageA.Text, "Message text A does not match.");
-
Assert.AreEqual(msgText, messageB.Text, "Message text B does not match.");
- }
+ ActiveMQTextMessage msg
= session.CreateTextMessage(msgText) as ActiveMQTextMessage;
-
if(AcknowledgementMode.Transactional == ackMode)
- {
- session.Commit();
- }
- }
+ // Give it two seconds
to live.
+ msg.NMSTimeToLive =
TimeSpan.FromMilliseconds(2000);
- // Give the Broker some time to remove the subscriptions.
- Thread.Sleep(2000);
+ producer.Send(msg);
- try
- {
- ((Session)
session).DeleteDestination(session.GetQueue(CONSUMER_A_DESTINATION_NAME));
- ((Session)
session).DeleteDestination(session.GetQueue(CONSUMER_B_DESTINATION_NAME));
- }
- catch
- {
- }
- }
- }
- }
+
if(AcknowledgementMode.Transactional == ackMode)
+ {
+
session.Commit();
+ }
- protected int receivedA;
- protected int receivedB;
+ // Wait for four
seconds before processing it. The broker will have sent it to our local
+ // client dispatch
queue, but we won't attempt to process the message until it has had
+ // a chance to expire
within our internal queue system.
+ Thread.Sleep(4000);
- [Test]
- // Do not use listeners with transactional processing.
- public void AsyncSendReceiveVirtualTopicMessage(
- [Values(AcknowledgementMode.AutoAcknowledge,
AcknowledgementMode.ClientAcknowledge, AcknowledgementMode.DupsOkAcknowledge)]
- AcknowledgementMode ackMode,
- [Values(MsgDeliveryMode.NonPersistent,
MsgDeliveryMode.Persistent)]
- MsgDeliveryMode deliveryMode)
- {
- receivedA = 0;
- receivedB = 0;
+ ActiveMQTextMessage
rcvMsg = consumer.ReceiveNoWait() as ActiveMQTextMessage;
- using(IConnection connection =
CreateConnection(TEST_CLIENT_ID))
- {
- connection.Start();
- using(ISession session =
connection.CreateSession(ackMode))
- {
- using(IMessageConsumer consumerA =
session.CreateConsumer(session.GetQueue(CONSUMER_A_DESTINATION_NAME)))
- using(IMessageConsumer consumerB =
session.CreateConsumer(session.GetQueue(CONSUMER_B_DESTINATION_NAME)))
- using(IMessageProducer producer =
session.CreateProducer(session.GetTopic(PRODUCER_DESTINATION_NAME)))
- {
- producer.RequestTimeout =
receiveTimeout;
- producer.DeliveryMode =
deliveryMode;
+
if(ExpirationOptions.IGNORE == expirationOption)
+ {
+
Assert.IsNotNull(rcvMsg, "Did not receive expired message.");
+
rcvMsg.Acknowledge();
- consumerA.Listener +=
MessageListenerA;
- consumerB.Listener +=
MessageListenerB;
+
Assert.AreEqual(msgText, rcvMsg.Text, "Message text does not match.");
+
Assert.IsTrue(rcvMsg.IsExpired());
- for(int index = 0; index <
totalMsgs; index++)
+
if(AcknowledgementMode.Transactional == ackMode)
+ {
+
session.Commit();
+ }
+ }
+ else
+ {
+ // Should not
receive a message.
+
Assert.IsNull(rcvMsg, "Received an expired message!");
+ }
+ }
+ }
+ finally
+ {
+ try
{
- string msgText =
"Message #" + index;
- Tracer.Info("Sending: "
+ msgText);
-
producer.Send(session.CreateTextMessage(msgText));
+ // Give the Broker some
time to remove the subscriptions.
+ Thread.Sleep(2000);
+
SessionUtil.DeleteDestination(session, destinationName);
}
-
- int waitCount = 0;
- while(receivedA < totalMsgs &&
receivedB < totalMsgs)
+ catch
{
- if(waitCount++ > 50)
- {
-
Assert.Fail("Timed out waiting for message consumers. A = " + receivedA + ", B
= " + receivedB);
- }
-
- Tracer.Info("Waiting...
Received A = " + receivedA + ", Received B = " + receivedB);
- Thread.Sleep(250);
}
}
-
- // Give the Broker some time to remove the subscriptions.
- Thread.Sleep(2000);
-
- try
- {
- ((Session)
session).DeleteDestination(session.GetQueue(CONSUMER_A_DESTINATION_NAME));
- ((Session)
session).DeleteDestination(session.GetQueue(CONSUMER_B_DESTINATION_NAME));
- }
- catch
- {
- }
- }
+ }
}
}
-
- private void MessageListenerA(IMessage message)
- {
- message.Acknowledge();
- ITextMessage messageA = message as ITextMessage;
- string msgText = "Message #" + receivedA;
- Assert.AreEqual(msgText, messageA.Text, "Message text A
does not match.");
- Tracer.Info("Received Listener A: " + msgText);
- receivedA++;
- }
-
- private void MessageListenerB(IMessage message)
- {
- message.Acknowledge();
- ITextMessage messageB = message as ITextMessage;
- string msgText = "Message #" + receivedB;
- Assert.AreEqual(msgText, messageB.Text, "Message text B
does not match.");
- Tracer.Info("Received Listener B: " + msgText);
- receivedB++;
- }
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj?rev=984268&r1=984267&r2=984268&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/vs2008-activemq-test.csproj
Tue Aug 10 23:53:33 2010
@@ -2,7 +2,7 @@
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
- <ProductVersion>9.0.21022</ProductVersion>
+ <ProductVersion>9.0.30729</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{EB943C69-2C9B-45E7-B95B-FB916E7057ED}</ProjectGuid>
<OutputType>Library</OutputType>
@@ -74,6 +74,7 @@
<Compile Include="src\test\csharp\CommonAssemblyInfo.cs" />
<Compile Include="src\test\csharp\ConnectionFactoryTest.cs" />
<Compile Include="src\test\csharp\ConnectionMetaDataTest.cs" />
+ <Compile Include="src\test\csharp\MessageConsumerTest.cs" />
<Compile Include="src\test\csharp\ExclusiveConsumerTest.cs" />
<Compile Include="src\test\csharp\IndividualAckTest.cs" />
<Compile Include="src\test\csharp\MessageListenerRedeliveryTest.cs" />
Modified:
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs?rev=984268&r1=984267&r2=984268&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS/trunk/src/main/csharp/Util/URISupport.cs
Tue Aug 10 23:53:33 2010
@@ -233,7 +233,7 @@ namespace Apache.NMS.Util
foreach(string key in props.Keys)
{
- if(key.StartsWith(prefix))
+ if(key.StartsWith(prefix, true, CultureInfo.InvariantCulture))
{
String value = props[key];
result[key] = value;