Author: tabish
Date: Thu Mar 31 21:59:56 2011
New Revision: 1087454

URL: http://svn.apache.org/viewvc?rev=1087454&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-323

Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1087454&r1=1087453&r2=1087454&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 Thu Mar 31 21:59:56 2011
@@ -661,18 +661,7 @@ namespace Apache.NMS.ActiveMQ
                                MessageDispatch dispatch = 
this.unconsumedMessages.DequeueNoWait();
                                if(dispatch != null)
                                {
-                                       try
-                                       {
-                                               ActiveMQMessage message = 
CreateActiveMQMessage(dispatch);
-                                               
BeforeMessageIsConsumed(dispatch);
-                                               listener(message);
-                                               
AfterMessageIsConsumed(dispatch, false);
-                                       }
-                                       catch(NMSException e)
-                                       {
-                                               
this.session.Connection.OnSessionException(this.session, e);
-                                       }
-
+                                       this.Dispatch(dispatch);
                                        return true;
                                }
                        }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs?rev=1087454&r1=1087453&r2=1087454&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/AMQRedeliveryPolicyTest.cs
 Thu Mar 31 21:59:56 2011
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Threading;
 using Apache.NMS.Test;
 using Apache.NMS.ActiveMQ.Commands;
 using NUnit.Framework;
@@ -343,5 +344,104 @@ namespace Apache.NMS.ActiveMQ.Test
             }
         }
 
+        [Test]
+        public void TestNornalRedeliveryPolicyOnRollbackUntilTimeToLive()
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+                policy.MaximumRedeliveries = -1;
+                policy.InitialRedeliveryDelay = 500;
+                policy.UseExponentialBackOff = false;
+
+                connection.Start();
+                ISession session = 
connection.CreateSession(AcknowledgementMode.Transactional);
+                IDestination destination = session.CreateTemporaryQueue();
+
+                IMessageProducer producer = 
session.CreateProducer(destination);
+                IMessageConsumer consumer = 
session.CreateConsumer(destination);
+
+                // Send the messages
+                ITextMessage textMessage = session.CreateTextMessage("1st");
+                textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0);
+                producer.Send(textMessage);
+                session.Commit();
+
+                ITextMessage m;
+                m = 
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(1000));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);
+                session.Rollback();
+
+                // No delay on first Rollback..
+                m = 
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+                Assert.IsNotNull(m);
+                session.Rollback();
+
+                // Show subsequent re-delivery delay is incrementing.
+                m = 
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(100));
+                Assert.IsNull(m);
+                m = 
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);
+                session.Rollback();
+
+                // The message gets redelivered after 500 ms every time since
+                // we are not using exponential backoff.
+                m = 
(ITextMessage)consumer.Receive(TimeSpan.FromMilliseconds(700));
+                Assert.IsNull(m);
+            }
+        }
+
+        [Test]
+        public void 
TestNornalRedeliveryPolicyOnRollbackUntilTimeToLiveCallback()
+        {
+            using(Connection connection = (Connection) CreateConnection())
+            {
+                IRedeliveryPolicy policy = connection.RedeliveryPolicy;
+                policy.MaximumRedeliveries = -1;
+                policy.InitialRedeliveryDelay = 500;
+                policy.UseExponentialBackOff = false;
+
+                connection.Start();
+                ISession session = 
connection.CreateSession(AcknowledgementMode.Transactional);
+                IDestination destination = session.CreateTemporaryQueue();
+
+                IMessageProducer producer = 
session.CreateProducer(destination);
+                IMessageConsumer consumer = 
session.CreateConsumer(destination);
+                CallbackClass cc = new CallbackClass(session);
+                consumer.Listener += new MessageListener(cc.consumer_Listener);
+
+                // Send the messages
+                ITextMessage textMessage = session.CreateTextMessage("1st");
+                textMessage.NMSTimeToLive = TimeSpan.FromMilliseconds(800.0);
+                producer.Send(textMessage, 
MsgDeliveryMode.Persistent,MsgPriority.Normal,TimeSpan.FromMilliseconds(800.0));
+                session.Commit();
+
+                // sends normal message, then immediate retry, then retry 
after 500 ms, then expire.
+                Thread.Sleep(2000);
+                Assert.AreEqual(3, cc.numReceived);
+            }
+        }
+
+        class CallbackClass
+        {
+            private ISession session;
+            public int numReceived = 0;
+
+            public CallbackClass(ISession session)
+            {
+                this.session = session;
+            }
+
+            public void consumer_Listener(IMessage message)
+            {
+                numReceived++;
+                ITextMessage m = message as ITextMessage;
+                Assert.IsNotNull(m);
+                Assert.AreEqual("1st", m.Text);
+                session.Rollback();
+            }
+        }
     }
 }


Reply via email to