[
https://issues.apache.org/jira/browse/AMQNET-471?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Imran updated AMQNET-471:
-------------------------
Attachment: TransactionContext.cs.patch
> Synchronous message consumer will lose a message that failed to commit whilst
> the broker was unavailable
> --------------------------------------------------------------------------------------------------------
>
> Key: AMQNET-471
> URL: https://issues.apache.org/jira/browse/AMQNET-471
> Project: ActiveMQ .Net
> Issue Type: Bug
> Components: ActiveMQ
> Affects Versions: 1.6.2
> Reporter: Imran
> Assignee: Jim Gomes
> Attachments: TransactionContext.cs.patch
>
>
> If the broker is down then the client can not commit the current message. An
> exception is thrown by the library. This is the behavior you would expect.
> If you then try and rollback the transaction on the session due to the
> exception and resume message consumption, the rolled back message will never
> be redelivered.
> {code:title=FailingTest|borderStyle=solid}
> [TestFixture, Explicit]
> public class BrokerRestart
> {
> [Test]
> public void
> Message_should_be_redilivered_if_broker_is_down_and_try_commit()
> {
> StartService(ActiveMqMaster);
> DeleteQueue();
> SendMessageToQueue();
> var session =
> _connection.CreateSession(AcknowledgementMode.Transactional);
> var consumer =
> session.CreateConsumer(SessionUtil.GetDestination(session, InQ));
> var message = consumer.Receive(TimeSpan.FromSeconds(30));
> _log.Debug("Received message");
> StopService(ActiveMqMaster);
> _log.Debug("Commiting transaction");
> try
> {
> session.Commit();
> }
> catch (Exception ex)
> {
> _log.ErrorFormat("Exception: {0}", ex.ToString().Substring(0,
> 250));
> try
> {
> session.Rollback();
> }
> catch (Exception einner)
> {
> _log.Debug("Rollback transaction");
> _log.ErrorFormat("Exception: {0}",
> einner.ToString().Substring(0, 250));
> }
> }
> StartService(ActiveMqMaster);
> message = consumer.Receive(TimeSpan.FromSeconds(30));
> Assert.That(message, Is.Not.Null, "message was not redilivered");
> }
> private void StartService(ServiceController service)
> {
> if (service.Status != ServiceControllerStatus.Running)
> service.Start();
> service.WaitForStatus(ServiceControllerStatus.Running);
> _log.Debug("Started Broker");
> }
> private void StopService(ServiceController service)
> {
> if (service.Status != ServiceControllerStatus.Stopped)
> service.Stop();
> service.WaitForStatus(ServiceControllerStatus.Stopped);
> _log.Debug("Stopped Broker Broker");
> }
> private void SendMessageToQueue()
> {
> using (var session = _connection.CreateSession())
> using (var producer =
> session.CreateProducer(SessionUtil.GetDestination(session, InQ)))
> {
> producer.Send(producer.CreateTextMessage(1.ToString()));
> session.Commit();
> }
> _log.Debug("Primed Input Queue");
> }
> private void DeleteQueue()
> {
> using (var session = _connection.CreateSession())
> {
> SessionUtil.DeleteDestination(session, InQ);
> }
> }
> [SetUp]
> public void TestSetup()
> {
> LogManager.Adapter = new
> ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
> _log = LogManager.GetLogger(typeof (BrokerRestart).Name);
> var factory = new
> ConnectionFactory(@"failover:(tcp://localhost:61616)")
> {
> AcknowledgementMode = AcknowledgementMode.Transactional,
> RedeliveryPolicy = new RedeliveryPolicy {
> InitialRedeliveryDelay = 0, MaximumRedeliveries = 3, BackOffMultiplier = 0,
> UseExponentialBackOff = false },
> AsyncSend = false,
> PrefetchPolicy = new PrefetchPolicy {All = 5}
> };
> _connection = factory.CreateConnection();
> _connection.Start();
> //Tracer.Trace = new CommonLoggingTraceAdapter();
> }
> protected ServiceController ActiveMqMaster = new
> ServiceController(@"ActiveMQ");
> //protected ServiceController ActiveMqSlave = new
> ServiceController(@"ActiveMQSlave");
> private IConnection _connection;
> private const string InQ = "integration-test-q";
> private ILog _log;
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)