Author: tabish
Date: Thu Mar 31 22:01:57 2011
New Revision: 1087456
URL: http://svn.apache.org/viewvc?rev=1087456&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-323
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs?rev=1087456&r1=1087455&r2=1087456&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/main/csharp/MessageConsumer.cs
Thu Mar 31 22:01:57 2011
@@ -661,18 +661,7 @@ namespace Apache.NMS.ActiveMQ
MessageDispatch dispatch =
this.unconsumedMessages.DequeueNoWait();
if(dispatch != null)
{
- try
- {
- ActiveMQMessage message =
CreateActiveMQMessage(dispatch);
-
BeforeMessageIsConsumed(dispatch);
- listener(message);
-
AfterMessageIsConsumed(dispatch, false);
- }
- catch(NMSException e)
- {
-
this.session.Connection.OnSessionException(this.session, e);
- }
-
+ this.Dispatch(dispatch);
return true;
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs?rev=1087456&r1=1087455&r2=1087456&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/branches/1.5.x/src/test/csharp/AMQRedeliveryPolicyTest.cs
Thu Mar 31 22:01:57 2011
@@ -16,6 +16,7 @@
*/
using System;
+using System.Threading;
using Apache.NMS.Test;
using Apache.NMS.ActiveMQ.Commands;
using NUnit.Framework;
@@ -343,5 +344,104 @@ namespace Apache.NMS.ActiveMQ.Test
}
}
+ [Test]
+ public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLive()
+ {
+ using(Connection connection = (Connection) CreateConnection())
+ {
+ IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+ policy.MaximumRedeliveries = -1;
+ policy.InitialRedeliveryDelay = 500;
+ policy.UseExponentialBackOff = false;
+
+ connection.Start();
+ ISession session =
connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+
+ IMessageProducer producer =
session.CreateProducer(destination);
+ IMessageConsumer consumer =
session.CreateConsumer(destination);
+
+ // Send the messages
+ ITextMessage textMessage = session.CreateTextMessage("1st");
+ textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0);
+ producer.Send(textMessage);
+ session.Commit();
+
+ ITextMessage m;
+ m =
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ // No delay on first Rollback..
+ m =
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+ Assert.IsNotNull(m);
+ session.Rollback();
+
+ // Show subsequent re-delivery delay is incrementing.
+ m =
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+ Assert.IsNull(m);
+ m =
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+
+ // The message gets redelivered after 500 ms every time since
+ // we are not using exponential backoff.
+ m =
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+ Assert.IsNull(m);
+ }
+ }
+
+ [Test]
+ public void
TestNornalRedeliveryPolicyOnRollbackUntilTimeToLiveCallback()
+ {
+ using(Connection connection = (Connection) CreateConnection())
+ {
+ IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+ policy.MaximumRedeliveries = -1;
+ policy.InitialRedeliveryDelay = 500;
+ policy.UseExponentialBackOff = false;
+
+ connection.Start();
+ ISession session =
connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+
+ IMessageProducer producer =
session.CreateProducer(destination);
+ IMessageConsumer consumer =
session.CreateConsumer(destination);
+ CallbackClass cc = new CallbackClass(session);
+ consumer.Listener += new MessageListener(cc.consumer_Listener);
+
+ // Send the messages
+ ITextMessage textMessage = session.CreateTextMessage("1st");
+ textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0);
+ producer.Send(textMessage,
MsgDeliveryMode.Persistent,MsgPriority.Normal,TimeSpan.FromMilliseconds(800.0));
+ session.Commit();
+
+ // sends normal message, then immediate retry, then retry
after 500 ms, then expire.
+ Thread.Sleep(2000);
+ Assert.AreEqual(3, cc.numReceived);
+ }
+ }
+
+ class CallbackClass
+ {
+ private ISession session;
+ public int numReceived = 0;
+
+ public CallbackClass(ISession session)
+ {
+ this.session = session;
+ }
+
+ public void consumer_Listener(IMessage message)
+ {
+ numReceived++;
+ ITextMessage m = message as ITextMessage;
+ Assert.IsNotNull(m);
+ Assert.AreEqual("1st", m.Text);
+ session.Rollback();
+ }
+ }
}
}