[ 
https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Imran updated AMQNET-471:
-------------------------

    Description: 
If the broker is down then the client can not commit the current message. An 
exception is thrown by the library. This is the behavior you would expect.

If you then try and rollback the transaction on the session due to the 
exception and resume message consumption, the rolled back message will never be 
redelivered.

{code:title=Failing Test|borderStyle=solid} 
 [TestFixture, Explicit]
    public class BrokerRestart
    {
        [Test]
        public void 
Message_should_be_redilivered_if_broker_is_down_and_try_commit()
        {
            StartService(ActiveMqMaster);
            DeleteQueue();
            SendMessageToQueue();
            var session = 
_connection.CreateSession(AcknowledgementMode.Transactional);
            var consumer = 
session.CreateConsumer(SessionUtil.GetDestination(session, InQ));

            var message = consumer.Receive(TimeSpan.FromSeconds(30));
            _log.Debug("Received message");
            StopService(ActiveMqMaster);
            _log.Debug("Commiting transaction");
            try
            {
                session.Commit();
            }
            catch (Exception ex)
            {
                _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 
250));
                try
                {
                    session.Rollback();
                }
                catch (Exception einner)
                {
                    _log.Debug("Rollback transaction");
                    _log.ErrorFormat("Exception: {0}", 
einner.ToString().Substring(0, 250));
                }
            }
            StartService(ActiveMqMaster);
            message = consumer.Receive(TimeSpan.FromSeconds(30));

            Assert.That(message, Is.Not.Null, "message was not redilivered");
        }

        private void StartService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Running)
                service.Start();
            service.WaitForStatus(ServiceControllerStatus.Running);
            _log.Debug("Started Broker");
        }

        private void StopService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Stopped)
                service.Stop();
            service.WaitForStatus(ServiceControllerStatus.Stopped);
            _log.Debug("Stopped Broker Broker");
        }

        private void SendMessageToQueue()
        {
            using (var session = _connection.CreateSession())
            using (var producer = 
session.CreateProducer(SessionUtil.GetDestination(session, InQ)))
            {
                producer.Send(producer.CreateTextMessage(1.ToString()));
                session.Commit();
            }
            _log.Debug("Primed Input Queue");
        }

        private void DeleteQueue()
        {
            using (var session = _connection.CreateSession())
            {
                SessionUtil.DeleteDestination(session, InQ);
            }
        }

        [SetUp]
        public void TestSetup()
        {
            LogManager.Adapter = new 
ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
            _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
            var factory = new 
ConnectionFactory(@"failover:(tcp://localhost:61616)")
            {
                AcknowledgementMode = AcknowledgementMode.Transactional,
                RedeliveryPolicy = new RedeliveryPolicy { 
InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, 
UseExponentialBackOff = false },
                AsyncSend = false,
                PrefetchPolicy = new PrefetchPolicy {All = 5}
            };
            _connection = factory.CreateConnection();
            _connection.Start();
            //Tracer.Trace = new CommonLoggingTraceAdapter();
        }

        protected ServiceController ActiveMqMaster = new 
ServiceController(@"ActiveMQ");
        //protected ServiceController ActiveMqSlave = new 
ServiceController(@"ActiveMQSlave");
        private IConnection _connection;
        private const string InQ = "integration-test-q";
        private ILog _log;
    }
{code}

{code:title=Passing Test With Patch|borderStyle=solid} 
using System;
using System.Configuration;
using System.ServiceProcess;
using System.Threading;
using System.Threading.Tasks;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.Policies;
using Apache.NMS.Util;
using Common.Logging;
using Common.Logging.Simple;
using NUnit.Framework;

namespace IntegrationTests.ApacheNms.Jira
{
    [TestFixture]
    public class BrokerRestart
    {
        //AMQNET-471
        [Test]
        public void 
Message_should_be_redilivered_if_broker_is_down_and_try_to_commit()
        {
            var session = 
_connection.CreateSession(AcknowledgementMode.Transactional);
            var consumer = 
session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
            SendMessageToQueue();

            consumer.Receive(TimeSpan.FromSeconds(5));
            StopService(ActiveMqMaster);
            var commiter = TryCommit(session);

            StartService(ActiveMqMaster);
            commiter.Wait();
            var message = consumer.Receive(TimeSpan.FromSeconds(5));
            TryCommit(session).Wait();

            Assert.That(message, Is.Not.Null, "message was not redilivered");
            Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(0));
        }

        //Commit blocks if the broker is down with the patch for AMQNET-471
        private Task TryCommit(ISession session)
        {
            var task = Task.Factory.StartNew(() =>
            {
                try
                {
                    session.Commit();
                }
                catch (Exception ex)
                {
                    _log.ErrorFormat("Exception: {0}", 
ex.ToString().Substring(0, 250));
                    try
                    {
                        session.Rollback();
                    }
                    catch (Exception einner)
                    {
                        _log.Debug("Rollback transaction");
                        _log.ErrorFormat("Exception: {0}", 
einner.ToString().Substring(0, 250));
                    }
                }
            });

            //Give it a chance to start.
            Thread.Sleep(1000);

            return task;
        }

        private int CountMessagesInQueue(string queue)
        {
            var count = 0;
            using (var session = 
_connection.CreateSession(AcknowledgementMode.Transactional))
            using (var consumerIn = 
session.CreateConsumer(SessionUtil.GetDestination(session, queue)))
            {
                while (true)
                {
                    var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
                    if (msg == null)
                        break;
                    count++;
                }
            }

            return count;
        }

        private void StartService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Running || 
service.Status == ServiceControllerStatus.StartPending)
                service.Start();
            service.WaitForStatus(ServiceControllerStatus.Running);
            _log.Debug("Started Broker");
        }

        private void StopService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Stopped)
                service.Stop();
            service.WaitForStatus(ServiceControllerStatus.Stopped);
            _log.Debug("Stopped Broker");
        }

        private void SendMessageToQueue()
        {
            using (var session = _connection.CreateSession())
            using (var producer = 
session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
            {
                producer.Send(producer.CreateTextMessage(1.ToString()));
                session.Commit();
            }
            _log.Debug("Primed Input Queue");
        }

        private void DeleteQueue(string queue)
        {
            using (var session = _connection.CreateSession())
            {
                SessionUtil.DeleteDestination(session, queue);
            }
        }

        [SetUp]
        public void TestSetup()
        {
            LogManager.Adapter = new 
ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
            _log = LogManager.GetLogger(typeof(BrokerRestart).Name);
            StartService(ActiveMqMaster);
            _factory = new ConnectionFactory(ActiveMqConnectionString)
            {
                AcknowledgementMode = AcknowledgementMode.Transactional,
                RedeliveryPolicy = new RedeliveryPolicy { 
InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, 
UseExponentialBackOff = false },
                AsyncSend = false,
                PrefetchPolicy = new PrefetchPolicy { All = 5 }
            };
            _connection = _factory.CreateConnection();
            _log.Debug("Starting connection");
            _connection.Start();
            _log.Debug("Connection established");

            DeleteQueue(InQueue);
            DeleteQueue(OutQueue);
            //Tracer.Trace = new CommonLoggingTraceAdapter();
        }

        [TearDown]
        public void TestTearDown()
        {
            _connection.Dispose();
        }

        protected ServiceController ActiveMqMaster = new 
ServiceController(ActiveMasterServiceName, ActiveMqMachineName);
        protected ServiceController ActiveMqSlave = new 
ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName);
        private static readonly string ActiveMqMachineName = 
ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
        private static readonly string ActiveMqConnectionString = 
ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
        private static readonly string ActiveMasterServiceName = 
ConfigurationManager.AppSettings["ActiveMqMasterName"];
        private static readonly string ActiveMqSlaveServiceName = 
ConfigurationManager.AppSettings["ActiveMqSlaveName"];
        private IConnection _connection;
        private const string InQueue = "integration-test-q-in";
        private const string OutQueue = "integration-test-q-out";
        private ILog _log;
        private ConnectionFactory _factory;
    }
}
{code}

  was:
If the broker is down then the client can not commit the current message. An 
exception is thrown by the library. This is the behavior you would expect.

If you then try and rollback the transaction on the session due to the 
exception and resume message consumption, the rolled back message will never be 
redelivered.

{code:title=FailingTest|borderStyle=solid} 
 [TestFixture, Explicit]
    public class BrokerRestart
    {
        [Test]
        public void 
Message_should_be_redilivered_if_broker_is_down_and_try_commit()
        {
            StartService(ActiveMqMaster);
            DeleteQueue();
            SendMessageToQueue();
            var session = 
_connection.CreateSession(AcknowledgementMode.Transactional);
            var consumer = 
session.CreateConsumer(SessionUtil.GetDestination(session, InQ));

            var message = consumer.Receive(TimeSpan.FromSeconds(30));
            _log.Debug("Received message");
            StopService(ActiveMqMaster);
            _log.Debug("Commiting transaction");
            try
            {
                session.Commit();
            }
            catch (Exception ex)
            {
                _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 
250));
                try
                {
                    session.Rollback();
                }
                catch (Exception einner)
                {
                    _log.Debug("Rollback transaction");
                    _log.ErrorFormat("Exception: {0}", 
einner.ToString().Substring(0, 250));
                }
            }
            StartService(ActiveMqMaster);
            message = consumer.Receive(TimeSpan.FromSeconds(30));

            Assert.That(message, Is.Not.Null, "message was not redilivered");
        }

        private void StartService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Running)
                service.Start();
            service.WaitForStatus(ServiceControllerStatus.Running);
            _log.Debug("Started Broker");
        }

        private void StopService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Stopped)
                service.Stop();
            service.WaitForStatus(ServiceControllerStatus.Stopped);
            _log.Debug("Stopped Broker Broker");
        }

        private void SendMessageToQueue()
        {
            using (var session = _connection.CreateSession())
            using (var producer = 
session.CreateProducer(SessionUtil.GetDestination(session, InQ)))
            {
                producer.Send(producer.CreateTextMessage(1.ToString()));
                session.Commit();
            }
            _log.Debug("Primed Input Queue");
        }

        private void DeleteQueue()
        {
            using (var session = _connection.CreateSession())
            {
                SessionUtil.DeleteDestination(session, InQ);
            }
        }

        [SetUp]
        public void TestSetup()
        {
            LogManager.Adapter = new 
ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
            _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
            var factory = new 
ConnectionFactory(@"failover:(tcp://localhost:61616)")
            {
                AcknowledgementMode = AcknowledgementMode.Transactional,
                RedeliveryPolicy = new RedeliveryPolicy { 
InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, 
UseExponentialBackOff = false },
                AsyncSend = false,
                PrefetchPolicy = new PrefetchPolicy {All = 5}
            };
            _connection = factory.CreateConnection();
            _connection.Start();
            //Tracer.Trace = new CommonLoggingTraceAdapter();
        }

        protected ServiceController ActiveMqMaster = new 
ServiceController(@"ActiveMQ");
        //protected ServiceController ActiveMqSlave = new 
ServiceController(@"ActiveMQSlave");
        private IConnection _connection;
        private const string InQ = "integration-test-q";
        private ILog _log;
    }
{code}


> Synchronous message consumer will lose a message that failed to commit whilst 
> the broker was unavailable
> --------------------------------------------------------------------------------------------------------
>
>                 Key: AMQNET-471
>                 URL: https://issues.apache.org/jira/browse/AMQNET-471
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ
>    Affects Versions: 1.6.2
>            Reporter: Imran
>            Assignee: Jim Gomes
>         Attachments: TransactionContext.cs.patch
>
>
> If the broker is down then the client can not commit the current message. An 
> exception is thrown by the library. This is the behavior you would expect.
> If you then try and rollback the transaction on the session due to the 
> exception and resume message consumption, the rolled back message will never 
> be redelivered.
> {code:title=Failing Test|borderStyle=solid} 
>  [TestFixture, Explicit]
>     public class BrokerRestart
>     {
>         [Test]
>         public void 
> Message_should_be_redilivered_if_broker_is_down_and_try_commit()
>         {
>             StartService(ActiveMqMaster);
>             DeleteQueue();
>             SendMessageToQueue();
>             var session = 
> _connection.CreateSession(AcknowledgementMode.Transactional);
>             var consumer = 
> session.CreateConsumer(SessionUtil.GetDestination(session, InQ));
>             var message = consumer.Receive(TimeSpan.FromSeconds(30));
>             _log.Debug("Received message");
>             StopService(ActiveMqMaster);
>             _log.Debug("Commiting transaction");
>             try
>             {
>                 session.Commit();
>             }
>             catch (Exception ex)
>             {
>                 _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 
> 250));
>                 try
>                 {
>                     session.Rollback();
>                 }
>                 catch (Exception einner)
>                 {
>                     _log.Debug("Rollback transaction");
>                     _log.ErrorFormat("Exception: {0}", 
> einner.ToString().Substring(0, 250));
>                 }
>             }
>             StartService(ActiveMqMaster);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             Assert.That(message, Is.Not.Null, "message was not redilivered");
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker Broker");
>         }
>         private void SendMessageToQueue()
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = 
> session.CreateProducer(SessionUtil.GetDestination(session, InQ)))
>             {
>                 producer.Send(producer.CreateTextMessage(1.ToString()));
>                 session.Commit();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue()
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, InQ);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new 
> ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
>             var factory = new 
> ConnectionFactory(@"failover:(tcp://localhost:61616)")
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { 
> InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, 
> UseExponentialBackOff = false },
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy {All = 5}
>             };
>             _connection = factory.CreateConnection();
>             _connection.Start();
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         protected ServiceController ActiveMqMaster = new 
> ServiceController(@"ActiveMQ");
>         //protected ServiceController ActiveMqSlave = new 
> ServiceController(@"ActiveMQSlave");
>         private IConnection _connection;
>         private const string InQ = "integration-test-q";
>         private ILog _log;
>     }
> {code}
> {code:title=Passing Test With Patch|borderStyle=solid} 
> using System;
> using System.Configuration;
> using System.ServiceProcess;
> using System.Threading;
> using System.Threading.Tasks;
> using Apache.NMS;
> using Apache.NMS.ActiveMQ;
> using Apache.NMS.Policies;
> using Apache.NMS.Util;
> using Common.Logging;
> using Common.Logging.Simple;
> using NUnit.Framework;
> namespace IntegrationTests.ApacheNms.Jira
> {
>     [TestFixture]
>     public class BrokerRestart
>     {
>         //AMQNET-471
>         [Test]
>         public void 
> Message_should_be_redilivered_if_broker_is_down_and_try_to_commit()
>         {
>             var session = 
> _connection.CreateSession(AcknowledgementMode.Transactional);
>             var consumer = 
> session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
>             SendMessageToQueue();
>             consumer.Receive(TimeSpan.FromSeconds(5));
>             StopService(ActiveMqMaster);
>             var commiter = TryCommit(session);
>             StartService(ActiveMqMaster);
>             commiter.Wait();
>             var message = consumer.Receive(TimeSpan.FromSeconds(5));
>             TryCommit(session).Wait();
>             Assert.That(message, Is.Not.Null, "message was not redilivered");
>             Assert.That(CountMessagesInQueue(OutQueue), Is.EqualTo(0));
>         }
>         //Commit blocks if the broker is down with the patch for AMQNET-471
>         private Task TryCommit(ISession session)
>         {
>             var task = Task.Factory.StartNew(() =>
>             {
>                 try
>                 {
>                     session.Commit();
>                 }
>                 catch (Exception ex)
>                 {
>                     _log.ErrorFormat("Exception: {0}", 
> ex.ToString().Substring(0, 250));
>                     try
>                     {
>                         session.Rollback();
>                     }
>                     catch (Exception einner)
>                     {
>                         _log.Debug("Rollback transaction");
>                         _log.ErrorFormat("Exception: {0}", 
> einner.ToString().Substring(0, 250));
>                     }
>                 }
>             });
>             //Give it a chance to start.
>             Thread.Sleep(1000);
>             return task;
>         }
>         private int CountMessagesInQueue(string queue)
>         {
>             var count = 0;
>             using (var session = 
> _connection.CreateSession(AcknowledgementMode.Transactional))
>             using (var consumerIn = 
> session.CreateConsumer(SessionUtil.GetDestination(session, queue)))
>             {
>                 while (true)
>                 {
>                     var msg = consumerIn.Receive(TimeSpan.FromSeconds(2));
>                     if (msg == null)
>                         break;
>                     count++;
>                 }
>             }
>             return count;
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running || 
> service.Status == ServiceControllerStatus.StartPending)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker");
>         }
>         private void SendMessageToQueue()
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = 
> session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
>             {
>                 producer.Send(producer.CreateTextMessage(1.ToString()));
>                 session.Commit();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue(string queue)
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, queue);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new 
> ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof(BrokerRestart).Name);
>             StartService(ActiveMqMaster);
>             _factory = new ConnectionFactory(ActiveMqConnectionString)
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { 
> InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, 
> UseExponentialBackOff = false },
>                 AsyncSend = false,
>                 PrefetchPolicy = new PrefetchPolicy { All = 5 }
>             };
>             _connection = _factory.CreateConnection();
>             _log.Debug("Starting connection");
>             _connection.Start();
>             _log.Debug("Connection established");
>             DeleteQueue(InQueue);
>             DeleteQueue(OutQueue);
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         [TearDown]
>         public void TestTearDown()
>         {
>             _connection.Dispose();
>         }
>         protected ServiceController ActiveMqMaster = new 
> ServiceController(ActiveMasterServiceName, ActiveMqMachineName);
>         protected ServiceController ActiveMqSlave = new 
> ServiceController(ActiveMqSlaveServiceName, ActiveMqMachineName);
>         private static readonly string ActiveMqMachineName = 
> ConfigurationManager.AppSettings["ActiveMqServiceMachineName"];
>         private static readonly string ActiveMqConnectionString = 
> ConfigurationManager.ConnectionStrings["ActiveMqServer"].ConnectionString;
>         private static readonly string ActiveMasterServiceName = 
> ConfigurationManager.AppSettings["ActiveMqMasterName"];
>         private static readonly string ActiveMqSlaveServiceName = 
> ConfigurationManager.AppSettings["ActiveMqSlaveName"];
>         private IConnection _connection;
>         private const string InQueue = "integration-test-q-in";
>         private const string OutQueue = "integration-test-q-out";
>         private ILog _log;
>         private ConnectionFactory _factory;
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to