Author: tabish
Date: Fri Apr 26 19:26:46 2013
New Revision: 1476352

URL: http://svn.apache.org/r1476352
Log:
fix: https://issues.apache.org/jira/browse/AMQNET-410

Added:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NonBlockingConsumerRedeliveryTest.cs
   (with props)
Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1476352&r1=1476351&r2=1476352&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 Fri Apr 26 19:26:46 2013
@@ -1405,6 +1405,12 @@ namespace Apache.NMS.ActiveMQ
                                                // We need to NACK the messages 
so that they get sent to the DLQ.
                        MessageAck ack = new MessageAck(lastMd, (byte) 
AckType.PoisonAck, dispatchedMessages.Count);
                        
+                        if(Tracer.IsDebugEnabled)
+                        {
+                                                       
Tracer.DebugFormat("Consumer {0} Poison Ack of {1} messages aft max 
redeliveries: {2}",
+                                               this.info.ConsumerId, 
this.dispatchedMessages.Count, this.redeliveryPolicy.MaximumRedeliveries);
+                        }
+
                                                if (lastMd.RollbackCause != 
null)
                                                {
                                                        BrokerError cause = new 
BrokerError();
@@ -1433,31 +1439,59 @@ namespace Apache.NMS.ActiveMQ
                                                        
this.session.SendAck(ack);
                                                }
 
-                                               // stop the delivery of 
messages.
-                                               this.unconsumedMessages.Stop();
-
-                        if(Tracer.IsDebugEnabled)
-                        {
-                            Tracer.DebugFormat("Consumer {0} Rolled Back, 
Re-enque {1} messages",
-                                               this.info.ConsumerId, 
this.dispatchedMessages.Count);
-                        }
-
-                                               foreach(MessageDispatch 
dispatch in this.dispatchedMessages)
+                                               if (this.nonBlockingRedelivery)
                                                {
-                            this.unconsumedMessages.EnqueueFirst(dispatch);
-                                               }
-
-                                               this.deliveredCounter -= 
this.dispatchedMessages.Count;
-                                               this.dispatchedMessages.Clear();
+                                                       if(redeliveryDelay == 0)
+                                                       {
+                                                               redeliveryDelay 
= RedeliveryPolicy.InitialRedeliveryDelay;
+                                                       }
 
-                                               if(redeliveryDelay > 0 && 
!this.unconsumedMessages.Closed)
-                                               {
-                                                       DateTime deadline = 
DateTime.Now.AddMilliseconds(redeliveryDelay);
-                                                       
ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
+                               if(Tracer.IsDebugEnabled)
+                               {
+                                                               
Tracer.DebugFormat("Consumer {0} Rolled Back, Re-enque {1} messages in 
Non-Blocking mode, delay: {2}",
+                                                      this.info.ConsumerId, 
this.dispatchedMessages.Count, redeliveryDelay);
+                               }
+
+                            List<MessageDispatch> pendingRedeliveries =
+                                new 
List<MessageDispatch>(this.dispatchedMessages);
+                                                       
pendingRedeliveries.Reverse();
+
+                                                       this.deliveredCounter 
-= this.dispatchedMessages.Count;
+                                                       
this.dispatchedMessages.Clear();
+
+                                                       
this.session.Scheduler.ExecuteAfterDelay(
+                                                               
NonBlockingRedeliveryCallback, 
+                                                               
pendingRedeliveries, 
+                                                               
TimeSpan.FromMilliseconds(redeliveryDelay));
                                                }
-                                               else
+                                               else 
                                                {
-                                                       Start();
+                                                       // stop the delivery of 
messages.
+                                                       
this.unconsumedMessages.Stop();
+
+                               if(Tracer.IsDebugEnabled)
+                               {
+                                   Tracer.DebugFormat("Consumer {0} Rolled 
Back, Re-enque {1} messages",
+                                                      this.info.ConsumerId, 
this.dispatchedMessages.Count);
+                               }
+
+                                                       foreach(MessageDispatch 
dispatch in this.dispatchedMessages)
+                                                       {
+                                   
this.unconsumedMessages.EnqueueFirst(dispatch);
+                                                       }
+
+                                                       this.deliveredCounter 
-= this.dispatchedMessages.Count;
+                                                       
this.dispatchedMessages.Clear();
+
+                                                       if(redeliveryDelay > 0 
&& !this.unconsumedMessages.Closed)
+                                                       {
+                                                               DateTime 
deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
+                                                               
ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
+                                                       }
+                                                       else
+                                                       {
+                                                               Start();
+                                                       }
                                                }
                                        }
                                }
@@ -1493,6 +1527,26 @@ namespace Apache.NMS.ActiveMQ
                        }
                }
 
+        private void NonBlockingRedeliveryCallback(object arg) 
+               {
+            try 
+                       {
+                if (!this.unconsumedMessages.Closed) 
+                               {
+                                       List<MessageDispatch> 
pendingRedeliveries = arg as List<MessageDispatch>;
+
+                    foreach (MessageDispatch dispatch in pendingRedeliveries) 
+                                       {
+                        session.Dispatch(dispatch);
+                    }
+                }
+            } 
+                       catch (Exception e) 
+                       {
+                               session.Connection.OnAsyncException(e);
+            }
+        }
+
                private ActiveMQMessage CreateActiveMQMessage(MessageDispatch 
dispatch)
                {
                        ActiveMQMessage message = dispatch.Message.Clone() as 
ActiveMQMessage;

Added: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NonBlockingConsumerRedeliveryTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NonBlockingConsumerRedeliveryTest.cs?rev=1476352&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NonBlockingConsumerRedeliveryTest.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NonBlockingConsumerRedeliveryTest.cs
 Fri Apr 26 19:26:46 2013
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+       [TestFixture]
+       public class NonBlockingConsumerRedeliveryTest  : NMSTestSupport
+       {
+        private Connection connection;
+               private ISession session;
+       private readonly int MSG_COUNT = 5;
+               private int count;
+
+        private List<IMessage> received = new List<IMessage>();
+        private List<IMessage> dlqed = new List<IMessage>();
+        private List<IMessage> beforeRollback = new List<IMessage>();
+        private List<IMessage> afterRollback = new List<IMessage>();
+
+        [SetUp]
+        public override void SetUp()
+        {
+                       base.SetUp();
+
+                       session = null;
+               connection = (Connection) CreateConnection();
+               connection.NonBlockingRedelivery = true;
+
+                       DeleteDLQ();
+
+                       this.received.Clear();
+                       this.beforeRollback.Clear();
+                       this.afterRollback.Clear();
+                       this.dlqed.Clear();
+                       this.count = 0;
+        }
+
+        [TearDown]
+        public override void TearDown()
+        {
+            if(this.connection != null)
+            {
+                this.connection.Close();
+                this.connection = null;
+            }
+            
+            base.TearDown();
+        }
+       
+               public void OnMessage(IMessage message)
+               {
+                       this.received.Add(message);
+               }
+
+               private void AssertReceived(int count, String message)
+               {
+                       AssertReceived(this.received, count, message);
+               }
+
+               private void AssertReceived(List<IMessage> target, int count, 
String message)
+               {
+                       for (int i = 0; i < 30; ++i)
+                       {
+                               if (target.Count == count)
+                               {
+                                       break;
+                               }
+                               Thread.Sleep(1000);                             
+                       }
+                       Assert.AreEqual(count, target.Count, message);
+               }
+
+               [Test]
+               public void testMessageDeleiveredWhenNonBlockingEnabled()
+               {
+               session = 
connection.CreateSession(AcknowledgementMode.Transactional);
+                       IDestination destination = 
session.CreateTemporaryQueue();
+               IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                       consumer.Listener += OnMessage;
+
+               SendMessages(destination);
+               session.Commit();
+               
+                       connection.Start();
+
+                       AssertReceived(MSG_COUNT, "Pre-Rollback expects to 
receive: " + MSG_COUNT + " messages.");
+
+               this.beforeRollback.AddRange(received);
+               this.received.Clear();
+               session.Rollback();
+
+                       AssertReceived(MSG_COUNT, "Post-Rollback expects to 
receive: " + MSG_COUNT + " messages.");
+
+               this.afterRollback.AddRange(received);
+               this.received.Clear();
+
+               Assert.AreEqual(this.beforeRollback.Count, 
this.afterRollback.Count);
+               Assert.AreEqual(this.beforeRollback, this.afterRollback);
+               session.Commit();
+               }
+
+               [Test]
+               public void testMessageDeleiveredInCorrectOrder()
+               {
+               session = 
connection.CreateSession(AcknowledgementMode.Transactional);
+               IDestination destination = session.CreateTemporaryQueue();
+               IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                       consumer.Listener += OnMessage;
+
+               SendMessages(destination);
+
+               session.Commit();
+               connection.Start();
+
+                       AssertReceived(MSG_COUNT, "Pre-Rollback expects to 
receive: " + MSG_COUNT + " messages.");
+
+               beforeRollback.AddRange(received);
+               received.Clear();
+               session.Rollback();
+
+                       AssertReceived(MSG_COUNT, "Post-Rollback expects to 
receive: " + MSG_COUNT + " messages.");
+
+               afterRollback.AddRange(received);
+               received.Clear();
+
+               Assert.AreEqual(beforeRollback.Count, afterRollback.Count);
+               Assert.AreEqual(beforeRollback, afterRollback);
+
+                       IEnumerator<IMessage> after = 
afterRollback.GetEnumerator();
+                       IEnumerator<IMessage> before = 
beforeRollback.GetEnumerator();
+
+                       while (after.MoveNext() && before.MoveNext())
+                       {
+                               ITextMessage original = before.Current as 
ITextMessage;
+                               ITextMessage rolledBack = after.Current as 
ITextMessage;
+
+                               int originalId = Int32.Parse(original.Text);
+                               int rolledBackId = Int32.Parse 
(rolledBack.Text);
+
+                               Assert.AreEqual(originalId, rolledBackId);
+                       }
+
+               session.Commit();
+               }
+
+               [Test]
+               public void testMessageDeleiveryDoesntStop()
+               {
+               session = 
connection.CreateSession(AcknowledgementMode.Transactional);
+                       IDestination destination = 
session.CreateTemporaryQueue();
+               IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                       consumer.Listener += OnMessage;
+
+               SendMessages(destination);
+               connection.Start();
+
+                       AssertReceived(MSG_COUNT, "Pre-Rollback expects to 
receive: " + MSG_COUNT + " messages.");
+
+               beforeRollback.AddRange(received);
+               received.Clear();
+               session.Rollback();
+
+               SendMessages(destination);
+
+                       AssertReceived(MSG_COUNT * 2, "Post-Rollback expects to 
receive: " + MSG_COUNT * 2 + " messages.");
+
+               afterRollback.AddRange(received);
+               received.Clear();
+
+               Assert.AreEqual(beforeRollback.Count * 2, afterRollback.Count);
+
+               session.Commit();
+               }
+
+               [Test]
+               public void testNonBlockingMessageDeleiveryIsDelayed()          
+               {
+               connection.RedeliveryPolicy.InitialRedeliveryDelay = 7000;
+               session = 
connection.CreateSession(AcknowledgementMode.Transactional);
+               IDestination destination = session.CreateTemporaryQueue();
+               IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                       consumer.Listener += OnMessage;
+
+               SendMessages(destination);
+               connection.Start();
+
+                       AssertReceived(MSG_COUNT, "Pre-Rollback expects to 
receive: " + MSG_COUNT + " messages.");
+
+               received.Clear();
+               session.Rollback();
+
+                       Thread.Sleep(4000);
+                       Assert.IsFalse(this.received.Count > 0, "Delayed 
redelivery test not expecting any messages yet.");
+
+                       AssertReceived(MSG_COUNT, "Post-Rollback expects to 
receive: " + MSG_COUNT + " messages.");
+
+               session.Commit();
+               session.Close();
+               }
+
+        public void OnMessageWithSomeRollbacks(IMessage message) 
+               {
+            if (++count > 10) 
+                       {
+                try 
+                               {
+                    session.Rollback();
+                    Tracer.Info("Rolling back session.");
+                    count = 0;
+                }
+                               catch (Exception e) 
+                               {
+                                       Tracer.WarnFormat("Caught an unexcepted 
exception: {0}", e.Message);
+                }
+            } 
+                       else 
+                       {
+                received.Add(message);
+                try 
+                               {
+                    session.Commit();
+                               }
+                               catch (Exception e) 
+                               {
+                                       Tracer.WarnFormat("Caught an unexcepted 
exception: {0}", e.Message);
+                }
+            }
+        }
+
+               [Test]
+               public void testNonBlockingMessageDeleiveryWithRollbacks()
+               {
+               session = 
connection.CreateSession(AcknowledgementMode.Transactional);
+               IDestination destination = session.CreateTemporaryQueue();
+               IMessageConsumer consumer = session.CreateConsumer(destination);
+
+                       consumer.Listener += OnMessage;
+
+               SendMessages(destination);
+               connection.Start();
+
+                       AssertReceived(MSG_COUNT, "Pre-Rollback expects to 
receive: " + MSG_COUNT + " messages.");
+
+               received.Clear();
+
+                       consumer.Listener -= OnMessage;
+                       consumer.Listener += OnMessageWithSomeRollbacks;
+
+               session.Rollback();
+
+                       AssertReceived(MSG_COUNT, "Post-Rollback expects to 
receive: " + MSG_COUNT + " messages.");
+
+               Assert.AreEqual(MSG_COUNT, received.Count);
+               session.Commit();
+               }
+
+               private void OnDLQMessage(IMessage message)
+               {
+                       Tracer.DebugFormat("DLQ Message {0}", message);
+            dlqed.Add(message);
+               }
+
+               private void OnMessageAlwaysRollsBack(IMessage message)
+               {
+                       session.Rollback();
+               }
+
+               [Test]
+               public void testNonBlockingMessageDeleiveryWithAllRolledBack()
+               {
+               connection.RedeliveryPolicy.MaximumRedeliveries = 3;
+               session = 
connection.CreateSession(AcknowledgementMode.Transactional);
+               IDestination destination = session.CreateTemporaryQueue();
+               IDestination dlq = session.GetQueue("ActiveMQ.DLQ");
+               IMessageConsumer consumer = session.CreateConsumer(destination);
+               IMessageConsumer dlqConsumer = session.CreateConsumer(dlq);
+
+                       dlqConsumer.Listener += OnDLQMessage;
+                       consumer.Listener += OnMessage;
+
+               SendMessages(destination);
+               connection.Start();
+
+                       AssertReceived(MSG_COUNT, "Pre-Rollback expects to 
receive: " + MSG_COUNT + " messages.");
+
+                       consumer.Listener -= OnMessage;
+                       consumer.Listener += OnMessageAlwaysRollsBack;
+
+                       session.Rollback();
+
+                       AssertReceived(dlqed, MSG_COUNT, "Post-Rollback expects 
to DLQ: " + MSG_COUNT + " messages.");
+
+               session.Commit();
+               }
+
+               private void DeleteDLQ()
+               {
+               session = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+               IDestination dlq = session.GetQueue("ActiveMQ.DLQ");
+                       try
+                       {
+                               connection.DeleteDestination(dlq);
+                       }
+                       catch
+                       {
+                       }
+               }
+
+           private void SendMessages(IDestination destination) 
+               {
+               IConnection connection = CreateConnection();
+               ISession session = 
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+               IMessageProducer producer = session.CreateProducer(destination);
+               for(int i = 0; i < MSG_COUNT; ++i) 
+                       {
+                   producer.Send(session.CreateTextMessage("" + i));
+               }
+                       connection.Close();
+           }
+       }
+}
+

Propchange: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NonBlockingConsumerRedeliveryTest.cs
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to