Author: tabish
Date: Fri Apr 26 19:26:46 2013
New Revision: 1476352
URL: http://svn.apache.org/r1476352
Log:
fix: https://issues.apache.org/jira/browse/AMQNET-410
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NonBlockingConsumerRedeliveryTest.cs
(with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1476352&r1=1476351&r2=1476352&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Fri Apr 26 19:26:46 2013
@@ -1405,6 +1405,12 @@ namespace Apache.NMS.ActiveMQ
// We need to NACK the messages
so that they get sent to the DLQ.
MessageAck ack = new MessageAck(lastMd, (byte)
AckType.PoisonAck, dispatchedMessages.Count);
+ if(Tracer.IsDebugEnabled)
+ {
+
Tracer.DebugFormat("Consumer {0} Poison Ack of {1} messages aft max
redeliveries: {2}",
+ this.info.ConsumerId,
this.dispatchedMessages.Count, this.redeliveryPolicy.MaximumRedeliveries);
+ }
+
if (lastMd.RollbackCause !=
null)
{
BrokerError cause = new
BrokerError();
@@ -1433,31 +1439,59 @@ namespace Apache.NMS.ActiveMQ
this.session.SendAck(ack);
}
- // stop the delivery of
messages.
- this.unconsumedMessages.Stop();
-
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Consumer {0} Rolled Back,
Re-enque {1} messages",
- this.info.ConsumerId,
this.dispatchedMessages.Count);
- }
-
- foreach(MessageDispatch
dispatch in this.dispatchedMessages)
+ if (this.nonBlockingRedelivery)
{
- this.unconsumedMessages.EnqueueFirst(dispatch);
- }
-
- this.deliveredCounter -=
this.dispatchedMessages.Count;
- this.dispatchedMessages.Clear();
+ if(redeliveryDelay == 0)
+ {
+ redeliveryDelay
= RedeliveryPolicy.InitialRedeliveryDelay;
+ }
- if(redeliveryDelay > 0 &&
!this.unconsumedMessages.Closed)
- {
- DateTime deadline =
DateTime.Now.AddMilliseconds(redeliveryDelay);
-
ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
+ if(Tracer.IsDebugEnabled)
+ {
+
Tracer.DebugFormat("Consumer {0} Rolled Back, Re-enque {1} messages in
Non-Blocking mode, delay: {2}",
+ this.info.ConsumerId,
this.dispatchedMessages.Count, redeliveryDelay);
+ }
+
+ List<MessageDispatch> pendingRedeliveries =
+ new
List<MessageDispatch>(this.dispatchedMessages);
+
pendingRedeliveries.Reverse();
+
+ this.deliveredCounter
-= this.dispatchedMessages.Count;
+
this.dispatchedMessages.Clear();
+
+
this.session.Scheduler.ExecuteAfterDelay(
+
NonBlockingRedeliveryCallback,
+
pendingRedeliveries,
+
TimeSpan.FromMilliseconds(redeliveryDelay));
}
- else
+ else
{
- Start();
+ // stop the delivery of
messages.
+
this.unconsumedMessages.Stop();
+
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Consumer {0} Rolled
Back, Re-enque {1} messages",
+ this.info.ConsumerId,
this.dispatchedMessages.Count);
+ }
+
+ foreach(MessageDispatch
dispatch in this.dispatchedMessages)
+ {
+
this.unconsumedMessages.EnqueueFirst(dispatch);
+ }
+
+ this.deliveredCounter
-= this.dispatchedMessages.Count;
+
this.dispatchedMessages.Clear();
+
+ if(redeliveryDelay > 0
&& !this.unconsumedMessages.Closed)
+ {
+ DateTime
deadline = DateTime.Now.AddMilliseconds(redeliveryDelay);
+
ThreadPool.QueueUserWorkItem(this.RollbackHelper, deadline);
+ }
+ else
+ {
+ Start();
+ }
}
}
}
@@ -1493,6 +1527,26 @@ namespace Apache.NMS.ActiveMQ
}
}
+ private void NonBlockingRedeliveryCallback(object arg)
+ {
+ try
+ {
+ if (!this.unconsumedMessages.Closed)
+ {
+ List<MessageDispatch>
pendingRedeliveries = arg as List<MessageDispatch>;
+
+ foreach (MessageDispatch dispatch in pendingRedeliveries)
+ {
+ session.Dispatch(dispatch);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ session.Connection.OnAsyncException(e);
+ }
+ }
+
private ActiveMQMessage CreateActiveMQMessage(MessageDispatch
dispatch)
{
ActiveMQMessage message = dispatch.Message.Clone() as
ActiveMQMessage;
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NonBlockingConsumerRedeliveryTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NonBlockingConsumerRedeliveryTest.cs?rev=1476352&view=auto
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NonBlockingConsumerRedeliveryTest.cs
(added)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NonBlockingConsumerRedeliveryTest.cs
Fri Apr 26 19:26:46 2013
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.Test;
+using Apache.NMS.ActiveMQ.Commands;
+using Apache.NMS.Util;
+using NUnit.Framework;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+ [TestFixture]
+ public class NonBlockingConsumerRedeliveryTest : NMSTestSupport
+ {
+ private Connection connection;
+ private ISession session;
+ private readonly int MSG_COUNT = 5;
+ private int count;
+
+ private List<IMessage> received = new List<IMessage>();
+ private List<IMessage> dlqed = new List<IMessage>();
+ private List<IMessage> beforeRollback = new List<IMessage>();
+ private List<IMessage> afterRollback = new List<IMessage>();
+
+ [SetUp]
+ public override void SetUp()
+ {
+ base.SetUp();
+
+ session = null;
+ connection = (Connection) CreateConnection();
+ connection.NonBlockingRedelivery = true;
+
+ DeleteDLQ();
+
+ this.received.Clear();
+ this.beforeRollback.Clear();
+ this.afterRollback.Clear();
+ this.dlqed.Clear();
+ this.count = 0;
+ }
+
+ [TearDown]
+ public override void TearDown()
+ {
+ if(this.connection != null)
+ {
+ this.connection.Close();
+ this.connection = null;
+ }
+
+ base.TearDown();
+ }
+
+ public void OnMessage(IMessage message)
+ {
+ this.received.Add(message);
+ }
+
+ private void AssertReceived(int count, String message)
+ {
+ AssertReceived(this.received, count, message);
+ }
+
+ private void AssertReceived(List<IMessage> target, int count,
String message)
+ {
+ for (int i = 0; i < 30; ++i)
+ {
+ if (target.Count == count)
+ {
+ break;
+ }
+ Thread.Sleep(1000);
+ }
+ Assert.AreEqual(count, target.Count, message);
+ }
+
+ [Test]
+ public void testMessageDeleiveredWhenNonBlockingEnabled()
+ {
+ session =
connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination =
session.CreateTemporaryQueue();
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ consumer.Listener += OnMessage;
+
+ SendMessages(destination);
+ session.Commit();
+
+ connection.Start();
+
+ AssertReceived(MSG_COUNT, "Pre-Rollback expects to
receive: " + MSG_COUNT + " messages.");
+
+ this.beforeRollback.AddRange(received);
+ this.received.Clear();
+ session.Rollback();
+
+ AssertReceived(MSG_COUNT, "Post-Rollback expects to
receive: " + MSG_COUNT + " messages.");
+
+ this.afterRollback.AddRange(received);
+ this.received.Clear();
+
+ Assert.AreEqual(this.beforeRollback.Count,
this.afterRollback.Count);
+ Assert.AreEqual(this.beforeRollback, this.afterRollback);
+ session.Commit();
+ }
+
+ [Test]
+ public void testMessageDeleiveredInCorrectOrder()
+ {
+ session =
connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ consumer.Listener += OnMessage;
+
+ SendMessages(destination);
+
+ session.Commit();
+ connection.Start();
+
+ AssertReceived(MSG_COUNT, "Pre-Rollback expects to
receive: " + MSG_COUNT + " messages.");
+
+ beforeRollback.AddRange(received);
+ received.Clear();
+ session.Rollback();
+
+ AssertReceived(MSG_COUNT, "Post-Rollback expects to
receive: " + MSG_COUNT + " messages.");
+
+ afterRollback.AddRange(received);
+ received.Clear();
+
+ Assert.AreEqual(beforeRollback.Count, afterRollback.Count);
+ Assert.AreEqual(beforeRollback, afterRollback);
+
+ IEnumerator<IMessage> after =
afterRollback.GetEnumerator();
+ IEnumerator<IMessage> before =
beforeRollback.GetEnumerator();
+
+ while (after.MoveNext() && before.MoveNext())
+ {
+ ITextMessage original = before.Current as
ITextMessage;
+ ITextMessage rolledBack = after.Current as
ITextMessage;
+
+ int originalId = Int32.Parse(original.Text);
+ int rolledBackId = Int32.Parse
(rolledBack.Text);
+
+ Assert.AreEqual(originalId, rolledBackId);
+ }
+
+ session.Commit();
+ }
+
+ [Test]
+ public void testMessageDeleiveryDoesntStop()
+ {
+ session =
connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination =
session.CreateTemporaryQueue();
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ consumer.Listener += OnMessage;
+
+ SendMessages(destination);
+ connection.Start();
+
+ AssertReceived(MSG_COUNT, "Pre-Rollback expects to
receive: " + MSG_COUNT + " messages.");
+
+ beforeRollback.AddRange(received);
+ received.Clear();
+ session.Rollback();
+
+ SendMessages(destination);
+
+ AssertReceived(MSG_COUNT * 2, "Post-Rollback expects to
receive: " + MSG_COUNT * 2 + " messages.");
+
+ afterRollback.AddRange(received);
+ received.Clear();
+
+ Assert.AreEqual(beforeRollback.Count * 2, afterRollback.Count);
+
+ session.Commit();
+ }
+
+ [Test]
+ public void testNonBlockingMessageDeleiveryIsDelayed()
+ {
+ connection.RedeliveryPolicy.InitialRedeliveryDelay = 7000;
+ session =
connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ consumer.Listener += OnMessage;
+
+ SendMessages(destination);
+ connection.Start();
+
+ AssertReceived(MSG_COUNT, "Pre-Rollback expects to
receive: " + MSG_COUNT + " messages.");
+
+ received.Clear();
+ session.Rollback();
+
+ Thread.Sleep(4000);
+ Assert.IsFalse(this.received.Count > 0, "Delayed
redelivery test not expecting any messages yet.");
+
+ AssertReceived(MSG_COUNT, "Post-Rollback expects to
receive: " + MSG_COUNT + " messages.");
+
+ session.Commit();
+ session.Close();
+ }
+
+ public void OnMessageWithSomeRollbacks(IMessage message)
+ {
+ if (++count > 10)
+ {
+ try
+ {
+ session.Rollback();
+ Tracer.Info("Rolling back session.");
+ count = 0;
+ }
+ catch (Exception e)
+ {
+ Tracer.WarnFormat("Caught an unexcepted
exception: {0}", e.Message);
+ }
+ }
+ else
+ {
+ received.Add(message);
+ try
+ {
+ session.Commit();
+ }
+ catch (Exception e)
+ {
+ Tracer.WarnFormat("Caught an unexcepted
exception: {0}", e.Message);
+ }
+ }
+ }
+
+ [Test]
+ public void testNonBlockingMessageDeleiveryWithRollbacks()
+ {
+ session =
connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+
+ consumer.Listener += OnMessage;
+
+ SendMessages(destination);
+ connection.Start();
+
+ AssertReceived(MSG_COUNT, "Pre-Rollback expects to
receive: " + MSG_COUNT + " messages.");
+
+ received.Clear();
+
+ consumer.Listener -= OnMessage;
+ consumer.Listener += OnMessageWithSomeRollbacks;
+
+ session.Rollback();
+
+ AssertReceived(MSG_COUNT, "Post-Rollback expects to
receive: " + MSG_COUNT + " messages.");
+
+ Assert.AreEqual(MSG_COUNT, received.Count);
+ session.Commit();
+ }
+
+ private void OnDLQMessage(IMessage message)
+ {
+ Tracer.DebugFormat("DLQ Message {0}", message);
+ dlqed.Add(message);
+ }
+
+ private void OnMessageAlwaysRollsBack(IMessage message)
+ {
+ session.Rollback();
+ }
+
+ [Test]
+ public void testNonBlockingMessageDeleiveryWithAllRolledBack()
+ {
+ connection.RedeliveryPolicy.MaximumRedeliveries = 3;
+ session =
connection.CreateSession(AcknowledgementMode.Transactional);
+ IDestination destination = session.CreateTemporaryQueue();
+ IDestination dlq = session.GetQueue("ActiveMQ.DLQ");
+ IMessageConsumer consumer = session.CreateConsumer(destination);
+ IMessageConsumer dlqConsumer = session.CreateConsumer(dlq);
+
+ dlqConsumer.Listener += OnDLQMessage;
+ consumer.Listener += OnMessage;
+
+ SendMessages(destination);
+ connection.Start();
+
+ AssertReceived(MSG_COUNT, "Pre-Rollback expects to
receive: " + MSG_COUNT + " messages.");
+
+ consumer.Listener -= OnMessage;
+ consumer.Listener += OnMessageAlwaysRollsBack;
+
+ session.Rollback();
+
+ AssertReceived(dlqed, MSG_COUNT, "Post-Rollback expects
to DLQ: " + MSG_COUNT + " messages.");
+
+ session.Commit();
+ }
+
+ private void DeleteDLQ()
+ {
+ session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IDestination dlq = session.GetQueue("ActiveMQ.DLQ");
+ try
+ {
+ connection.DeleteDestination(dlq);
+ }
+ catch
+ {
+ }
+ }
+
+ private void SendMessages(IDestination destination)
+ {
+ IConnection connection = CreateConnection();
+ ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IMessageProducer producer = session.CreateProducer(destination);
+ for(int i = 0; i < MSG_COUNT; ++i)
+ {
+ producer.Send(session.CreateTextMessage("" + i));
+ }
+ connection.Close();
+ }
+ }
+}
+
Propchange:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/NonBlockingConsumerRedeliveryTest.cs
------------------------------------------------------------------------------
svn:eol-style = native