[ 
https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13919637#comment-13919637
 ] 

Imran edited comment on AMQNET-471 at 3/4/14 5:18 PM:
------------------------------------------------------

I see, yes this is a single consumer for the purpose of the test. I am 
expecting the commit to fail and then after the rollback, when the service 
comes back up the message should be redelivered. I couldn't see anything wrong 
with my client code as the client should just be able to consume the message 
again when it is redelivered. I can see why this exception is thrown on commit, 
since you can't commit if you have no connection but then shouldn't the 
rollback clear the message from the client cache and just allow it to be 
redelivered by the broker?

I have spent some time looking at the code and also comparing it to the java 
source. There is a lot going on in the source and I don't see any obvious 
differences between the java and .net code so i'm not sure I will be able to 
fix this one myself. I think the java client therefore may have the same issue?


was (Author: imran99):
I see, yes this is a single consumer for the purpose of the test. I am 
expecting the commit to fail and then after the rollback, when the service 
comes back up the message should be redelivered. I couldn't see anything wrong 
with my client code as the client should just be able to consume the message 
again when it is redelivered. I can see why this exception is thrown on commit, 
since you can't commit if you have no connection but then shouldn't the 
rollback clear the message from the client cache and just allow it to be 
redelivered by the broker?

I have spent some time looking at the code and also comparing it to the java 
source. There is a lot going on in the source and I don't see any obvious 
differences between the java and .net code so i'm not sure I will be able to 
fix this one myself.

> Synchronous message consumer will lose a message that failed to commit whilst 
> the broker was unavailable
> --------------------------------------------------------------------------------------------------------
>
>                 Key: AMQNET-471
>                 URL: https://issues.apache.org/jira/browse/AMQNET-471
>             Project: ActiveMQ .Net
>          Issue Type: Bug
>          Components: ActiveMQ
>    Affects Versions: 1.6.2
>            Reporter: Imran
>            Assignee: Jim Gomes
>
> If the broker is down then the client can not commit the current message. An 
> exception is thrown by the library. This is the behavior you would expect.
> If you then try and rollback the transaction on the session due to the 
> exception and resume message consumption, the rolled back message will never 
> be redelivered.
> {code:title=FailingTest|borderStyle=solid} 
>  [TestFixture, Explicit]
>     public class BrokerRestart
>     {
>         [Test]
>         public void 
> Message_should_be_redilivered_if_broker_is_down_and_try_commit()
>         {
>             StartService(ActiveMqMaster);
>             DeleteQueue();
>             SendMessageToQueue();
>             var session = 
> _connection.CreateSession(AcknowledgementMode.Transactional);
>             var consumer = 
> session.CreateConsumer(SessionUtil.GetDestination(session, InQ));
>             var message = consumer.Receive(TimeSpan.FromSeconds(30));
>             _log.Debug("Received message");
>             StopService(ActiveMqMaster);
>             _log.Debug("Commiting transaction");
>             try
>             {
>                 session.Commit();
>             }
>             catch (Exception ex)
>             {
>                 _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0, 
> 250));
>                 try
>                 {
>                     session.Rollback();
>                 }
>                 catch (Exception einner)
>                 {
>                     _log.Debug("Rollback transaction");
>                     _log.ErrorFormat("Exception: {0}", 
> einner.ToString().Substring(0, 250));
>                 }
>             }
>             StartService(ActiveMqMaster);
>             message = consumer.Receive(TimeSpan.FromSeconds(30));
>             Assert.That(message, Is.Not.Null, "message was not redilivered");
>         }
>         private void StartService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Running)
>                 service.Start();
>             service.WaitForStatus(ServiceControllerStatus.Running);
>             _log.Debug("Started Broker");
>         }
>         private void StopService(ServiceController service)
>         {
>             if (service.Status != ServiceControllerStatus.Stopped)
>                 service.Stop();
>             service.WaitForStatus(ServiceControllerStatus.Stopped);
>             _log.Debug("Stopped Broker Broker");
>         }
>         private void SendMessageToQueue()
>         {
>             using (var session = _connection.CreateSession())
>             using (var producer = 
> session.CreateProducer(SessionUtil.GetDestination(session, InQ)))
>             {
>                 producer.Send(producer.CreateTextMessage(1.ToString()));
>                 session.Commit();
>             }
>             _log.Debug("Primed Input Queue");
>         }
>         private void DeleteQueue()
>         {
>             using (var session = _connection.CreateSession())
>             {
>                 SessionUtil.DeleteDestination(session, InQ);
>             }
>         }
>         [SetUp]
>         public void TestSetup()
>         {
>             LogManager.Adapter = new 
> ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
>             _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
>             var factory = new 
> ConnectionFactory(@"failover:(tcp://localhost:61616)")
>             {
>                 AcknowledgementMode = AcknowledgementMode.Transactional,
>                 RedeliveryPolicy = new RedeliveryPolicy { 
> InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0, 
> UseExponentialBackOff = false },
>                 AsyncSend = false,
>             };
>             _connection = factory.CreateConnection();
>             _connection.Start();
>             //Tracer.Trace = new CommonLoggingTraceAdapter();
>         }
>         protected ServiceController ActiveMqMaster = new 
> ServiceController(@"ActiveMQ");
>         //protected ServiceController ActiveMqSlave = new 
> ServiceController(@"ActiveMQSlave");
>         private IConnection _connection;
>         private const string InQ = "integration-test-q";
>         private ILog _log;
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to