[
https://issues.apache.org/jira/browse/AMQNET-474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timothy Bish resolved AMQNET-474.
---------------------------------
Resolution: Fixed
Fix Version/s: 1.7.0
1.6.4
Fire error events via OnAsyncException as notifications which won't terminate
the connection on error when using failover transport.
> DTC Consumer is forcibly closed if a transaction is in progress and
> connection to the broker is interrupted
> -----------------------------------------------------------------------------------------------------------
>
> Key: AMQNET-474
> URL: https://issues.apache.org/jira/browse/AMQNET-474
> Project: ActiveMQ .Net
> Issue Type: Bug
> Components: ActiveMQ
> Affects Versions: 1.6.2
> Reporter: Imran
> Assignee: Timothy Bish
> Fix For: 1.6.4, 1.7.0
>
> Attachments: NetTxTransactionContext.cs.patch
>
>
> DTC Consumer is forcibly closed if a transaction is in progress and the
> connection to the broker is interrupted. This behavior is different to non
> DTC consumers. This happens with a fail over connection specified which is
> not the correct behavior as you would expect the fail over feature to
> reestablish the connection on behalf of the client.
> {code}
> using System;
> using System.ServiceProcess;
> using System.Transactions;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Tests.Jira.DistributedTransaction
> {
> [TestFixture]
> public class BrokerRestartAndFailover
> {
> [Test, Explicit("After a broker restart the consumer is forcibly
> closed. This is not desirable as this behaviour is different to non dtc
> consumers.")]
> public void Should_rediliver_message_after_broker_restart()
> {
> SendMessageToQueue("1");
> var session =
> _connection.CreateSession(AcknowledgementMode.Transactional);
> var consumer =
> session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
> var transaction = new
> TransactionScope(TransactionScopeOption.RequiresNew);
> consumer.Receive(TimeSpan.FromSeconds(1));
> StopService(ActiveMqMaster);
> StartService(ActiveMqMaster);
> transaction.Complete();
> transaction.Dispose();
> //try a few times to drain the queue
> var messageRedilivered = 0;
> for (var i = 0; i < 2; i++)
> {
> transaction = new
> TransactionScope(TransactionScopeOption.RequiresNew);
> try
> {
> var message = consumer.Receive(TimeSpan.FromSeconds(1));
> transaction.Complete();
> if (message != null)
> messageRedilivered++;
> }
> catch (Exception ex)
> {
> LogManager.GetCurrentClassLogger().Error(ex);
> }
> finally
> {
> transaction.Dispose();
> }
> }
> Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
> Assert.That(messageRedilivered, Is.EqualTo(1));
> }
> public int CountMessagesInQueue(string queue)
> {
> var factory = new ConnectionFactory(ConnectionString)
> {
> AcknowledgementMode = AcknowledgementMode.Transactional
> };
>
> var count = 0;
> using (var connection = factory.CreateConnection())
> using (var session = connection.CreateSession())
> using (var consumer =
> session.CreateConsumer(SessionUtil.GetQueue(session, queue)))
> {
> connection.Start();
> while (true)
> {
> var message = consumer.Receive(TimeSpan.FromSeconds(1));
> if (message == null)
> break;
> count++;
> }
> }
> return count;
> }
> private void DeleteQueue(string queue)
> {
> using (var session = _connection.CreateSession())
> {
> SessionUtil.DeleteDestination(session, queue);
> }
> }
> private void SendMessageToQueue(string message)
> {
> using (var session = _connection.CreateSession())
> using (var producer =
> session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
> using (var scope = new
> TransactionScope(TransactionScopeOption.RequiresNew))
> {
> producer.Send(producer.CreateTextMessage(message));
> scope.Complete();
> }
> Log.Debug("Primed Input Queue");
> }
> private void StartService(ServiceController service)
> {
> if(service.Status != ServiceControllerStatus.Running)
> service.Start();
> service.WaitForStatus(ServiceControllerStatus.Running);
> }
> private void StopService(ServiceController service)
> {
> if (service.Status != ServiceControllerStatus.Stopped)
> service.Stop();
> service.WaitForStatus(ServiceControllerStatus.Stopped);
> }
> [SetUp]
> public void TestSetup()
> {
> LogManager.Adapter = new
> ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
> StartService(ActiveMqMaster);
> StopService(ActiveMqSlave);
> _connectionFactory = new NetTxConnectionFactory(ConnectionString)
> {
> AcknowledgementMode = AcknowledgementMode.Transactional,
> RedeliveryPolicy = new RedeliveryPolicy {
> InitialRedeliveryDelay = 10, MaximumRedeliveries = 3, BackOffMultiplier = 0,
> UseExponentialBackOff = false },
> DispatchAsync = true,
> AsyncSend = false,
> PrefetchPolicy = new PrefetchPolicy { All = 10 },
> };
> _connection = _connectionFactory.CreateConnection();
> _connection.ConnectionInterruptedListener += () =>
> LogManager.GetCurrentClassLogger().Debug("Connection interrupted");
> _connection.ConnectionResumedListener += () =>
> LogManager.GetCurrentClassLogger().Debug("Connection resumed");
> _connection.ExceptionListener += ex =>
> LogManager.GetCurrentClassLogger().ErrorFormat("Connection exception: '{0}'",
> ex.ToString());
> _connection.Start();
> DeleteQueue(InQueue);
> DeleteQueue(OutQueue);
> }
> [TearDown]
> public void TestTeardown()
> {
> StartService(ActiveMqMaster);
> StopService(ActiveMqSlave);
> }
> private const string ConnectionString =
> @"failover:(tcp://localhost:61616)";
> protected ServiceController ActiveMqMaster = new
> ServiceController(@"ActiveMQ");
> protected ServiceController ActiveMqSlave = new
> ServiceController(@"ActiveMQSlave");
> private IConnection _connection;
> private const string InQueue = "in-q";
> private const string OutQueue = "out-q";
> private static readonly ILog Log =
> LogManager.GetLogger(typeof(BrokerRestartAndFailover).Name);
> private NetTxConnectionFactory _connectionFactory;
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)