[ 
https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13919210#comment-13919210
 ] 

Imran commented on AMQNET-471:
------------------------------

Bump! Any ideas on how to fix this one? I have looked through the source and it 
looks like the rollback executes the same 'BeforeEnd' code as commit. This is 
odd because this is the code path for acknowledgment.

An exception is thrown by the code on commit and rollback by the method 
'RollbackOnFailedRecoveryRedelivery' in the class 'MessageConsumer'. It seems 
this is being thrown because the flag 'clearDispatchList' was set by the 
Connection in its 'OnTransportInterrupted' processing.

I'm not really sure what the exception message 'post failover recovery. 1 
previously delivered message(s) not replayed to consumer: ' that is generated 
means or why this fails when the connection is down but not when the connection 
is active so it is hard for me to produce a patch for this.

> Synchronous message consumer will lose a message that failed to commit whilst 
> the broker was unavailable
> --------------------------------------------------------------------------------------------------------
>
>                 Key: AMQNET-471
>                 URL: https://issues.apache.org/jira/browse/AMQNET-471
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ
>    Affects Versions: 1.6.2
>            Reporter: Imran
>            Assignee: Jim Gomes
>
> If the broker is down then the client can not commit the current message. An 
> exception is thrown by the library. This is the behavior you would expect.
> If you then try and rollback the transaction on the session due to the 
> exception and resume message consumption, the rolled back message will never 
> be redelivered.
> {code:title=FailingTest|borderStyle=solid} 
>  [TestFixture, Explicit]
>     public class BrokerRestart
>     {
>         [Test]
>         public void 
> Message_should_be_redilivered_if_broker_is_down_and_try_commit()
>         {
>             StartService(ActiveMqMaster);
>             DeleteQueue();
>             SendMessageToQueue();
>             var session = 
> _connection.CreateSession(AcknowledgementMode.Transactional);
>             var consumer = 
> session.CreateConsumer(SessionUtil.GetDestination(session, InQ));
>             var message = consumer.Receive(TimeSpan.FromSeconds(30));
>             _log.Debug("Received message");
>             StopService(ActiveMqMaster);
>             _log.Debug("Commiting transaction");
>             try
>             {
>                 session.Commit();
>             }
>             catch (Exception ex)
>             {
>                 _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 
> 250));
>                 try
>                 {
>                     session.Rollback();
>                 }
>                 catch (Exception einner)
>                 {
>                     _log.Debug("Rollback transaction");
>                     _log.ErrorFormat("Exception: {0}", 
> einner.ToString().Substring(0, 250));
>                 }
>             }
>             StartService(ActiveMqMaster);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             Assert.That(message, Is.Not.Null, "message was not redilivered");
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker Broker");
>         }
>         private void SendMessageToQueue()
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = 
> session.CreateProducer(SessionUtil.GetDestination(session, InQ)))
>             {
>                 producer.Send(producer.CreateTextMessage(1.ToString()));
>                 session.Commit();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue()
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, InQ);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new 
> ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
>             var factory = new 
> ConnectionFactory(@"failover:(tcp://localhost:61616)")
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { 
> InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, 
> UseExponentialBackOff = false },
>                 AsyncSend = false,
>             };
>             _connection = factory.CreateConnection();
>             _connection.Start();
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         protected ServiceController ActiveMqMaster = new 
> ServiceController(@"ActiveMQ");
>         //protected ServiceController ActiveMqSlave = new 
> ServiceController(@"ActiveMQSlave");
>         private IConnection _connection;
>         private const string InQ = "integration-test-q";
>         private ILog _log;
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to