Imran created AMQNET-471:
----------------------------

             Summary: Synchronous message consumer will lose a message that 
failed to commit whilst the broker was unavailable
                 Key: AMQNET-471
                 URL: https://issues.apache.org/jira/browse/AMQNET-471
             Project: ActiveMQ .Net
          Issue Type: Bug
          Components: ActiveMQ
    Affects Versions: 1.6.2
            Reporter: Imran
            Assignee: Jim Gomes


If the broker is down then the client can not commit the current message. An 
exception is thrown by the library. This is the behavior you would expect.

If you then try and rollback the transaction on the session due to the 
exception and resume message consumption, the rolled back message will never be 
redelivered.

{code:title=Bar.java|borderStyle=solid} 
 [TestFixture, Explicit]
    public class BrokerRestart
    {
        [Test]
        public void 
Message_should_be_redilivered_if_broker_is_down_and_try_commit()
        {
            StartService(ActiveMqMaster);
            DeleteQueue();
            SendMessageToQueue();
            var session = 
_connection.CreateSession(AcknowledgementMode.Transactional);
            var consumer = 
session.CreateConsumer(SessionUtil.GetDestination(session, InQ));

            var message = consumer.Receive(TimeSpan.FromSeconds(30));
            _log.Debug("Received message");
            StopService(ActiveMqMaster);
            _log.Debug("Commiting transaction");
            try
            {
                session.Commit();
            }
            catch (Exception ex)
            {
                _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 
250));
                try
                {
                    session.Rollback();
                }
                catch (Exception einner)
                {
                    _log.Debug("Rollback transaction");
                    _log.ErrorFormat("Exception: {0}", 
einner.ToString().Substring(0, 250));
                }
            }
            StartService(ActiveMqMaster);
            message = consumer.Receive(TimeSpan.FromSeconds(30));

            Assert.That(message, Is.Not.Null, "message was not redilivered");
        }

        private void StartService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Running)
                service.Start();
            service.WaitForStatus(ServiceControllerStatus.Running);
            _log.Debug("Started Broker");
        }

        private void StopService(ServiceController service)
        {
            if (service.Status != ServiceControllerStatus.Stopped)
                service.Stop();
            service.WaitForStatus(ServiceControllerStatus.Stopped);
            _log.Debug("Stopped Broker Broker");
        }

        private void SendMessageToQueue()
        {
            using (var session = _connection.CreateSession())
            using (var producer = 
session.CreateProducer(SessionUtil.GetDestination(session, InQ)))
            {
                producer.Send(producer.CreateTextMessage(1.ToString()));
                session.Commit();
            }
            _log.Debug("Primed Input Queue");
        }

        private void DeleteQueue()
        {
            using (var session = _connection.CreateSession())
            {
                SessionUtil.DeleteDestination(session, InQ);
            }
        }

        [SetUp]
        public void TestSetup()
        {
            LogManager.Adapter = new 
ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
            _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
            var factory = new 
ConnectionFactory(@"failover:(tcp://localhost:61616)")
            {
                AcknowledgementMode = AcknowledgementMode.Transactional,
                RedeliveryPolicy = new RedeliveryPolicy { 
InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, 
UseExponentialBackOff = false },
                AsyncSend = false,
            };
            _connection = factory.CreateConnection();
            _connection.Start();
            //Tracer.Trace = new CommonLoggingTraceAdapter();
        }

        protected ServiceController ActiveMqMaster = new 
ServiceController(@"ActiveMQ");
        //protected ServiceController ActiveMqSlave = new 
ServiceController(@"ActiveMQSlave");
        private IConnection _connection;
        private const string InQ = "integration-test-q";
        private ILog _log;
    }
{code}



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to