Imran created AMQNET-474:
----------------------------
Summary: DTC Consumer is forcibly closed if a transaction is in
progress and connection to the broker is interrupted
Key: AMQNET-474
URL: https://issues.apache.org/jira/browse/AMQNET-474
Project: ActiveMQ .Net
Issue Type: Bug
Components: ActiveMQ
Affects Versions: 1.6.2
Reporter: Imran
Assignee: Jim Gomes
DTC Consumer is forcibly closed if a transaction is in progress and the
connection to the broker is interrupted. This behavior is different to non DTC
consumers. This happens with a fail over connection specified which is not the
correct behavior as you would expect the fail over feature to reestablish the
connection on behalf of the client.
{code}
using System;
using System.ServiceProcess;
using System.Transactions;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
using Apache.NMS.Policies;
using Apache.NMS.Util;
using Common.Logging;
using Common.Logging.Simple;
using NUnit.Framework;
namespace IntegrationTests.ApacheNms.Tests.Jira.DistributedTransaction
{
[TestFixture]
public class BrokerRestartAndFailover
{
[Test, Explicit("After a broker restart the consumer is forcibly
closed. This is not desirable as this behaviour is different to non dtc
consumers.")]
public void Should_rediliver_message_after_broker_restart()
{
SendMessageToQueue("1");
var session =
_connection.CreateSession(AcknowledgementMode.Transactional);
var consumer =
session.CreateConsumer(SessionUtil.GetDestination(session, InQueue));
var transaction = new
TransactionScope(TransactionScopeOption.RequiresNew);
consumer.Receive(TimeSpan.FromSeconds(1));
StopService(ActiveMqMaster);
StartService(ActiveMqMaster);
transaction.Complete();
transaction.Dispose();
//try a few times to drain the queue
var messageRedilivered = 0;
for (var i = 0; i < 2; i++)
{
transaction = new
TransactionScope(TransactionScopeOption.RequiresNew);
try
{
var message = consumer.Receive(TimeSpan.FromSeconds(1));
transaction.Complete();
if (message != null)
messageRedilivered++;
}
catch (Exception ex)
{
LogManager.GetCurrentClassLogger().Error(ex);
}
finally
{
transaction.Dispose();
}
}
Assert.That(CountMessagesInQueue(InQueue), Is.EqualTo(0));
Assert.That(messageRedilivered, Is.EqualTo(1));
}
public int CountMessagesInQueue(string queue)
{
var factory = new ConnectionFactory(ConnectionString)
{
AcknowledgementMode = AcknowledgementMode.Transactional
};
var count = 0;
using (var connection = factory.CreateConnection())
using (var session = connection.CreateSession())
using (var consumer =
session.CreateConsumer(SessionUtil.GetQueue(session, queue)))
{
connection.Start();
while (true)
{
var message = consumer.Receive(TimeSpan.FromSeconds(1));
if (message == null)
break;
count++;
}
}
return count;
}
private void DeleteQueue(string queue)
{
using (var session = _connection.CreateSession())
{
SessionUtil.DeleteDestination(session, queue);
}
}
private void SendMessageToQueue(string message)
{
using (var session = _connection.CreateSession())
using (var producer =
session.CreateProducer(SessionUtil.GetDestination(session, InQueue)))
using (var scope = new
TransactionScope(TransactionScopeOption.RequiresNew))
{
producer.Send(producer.CreateTextMessage(message));
scope.Complete();
}
Log.Debug("Primed Input Queue");
}
private void StartService(ServiceController service)
{
if(service.Status != ServiceControllerStatus.Running)
service.Start();
service.WaitForStatus(ServiceControllerStatus.Running);
}
private void StopService(ServiceController service)
{
if (service.Status != ServiceControllerStatus.Stopped)
service.Stop();
service.WaitForStatus(ServiceControllerStatus.Stopped);
}
[SetUp]
public void TestSetup()
{
LogManager.Adapter = new
ConsoleOutLoggerFactoryAdapter(LogLevel.Debug, true, true, true, "HH:MM:ss");
StartService(ActiveMqMaster);
StopService(ActiveMqSlave);
_connectionFactory = new NetTxConnectionFactory(ConnectionString)
{
AcknowledgementMode = AcknowledgementMode.Transactional,
RedeliveryPolicy = new RedeliveryPolicy {
InitialRedeliveryDelay = 10, MaximumRedeliveries = 3, BackOffMultiplier = 0,
UseExponentialBackOff = false },
DispatchAsync = true,
AsyncSend = false,
PrefetchPolicy = new PrefetchPolicy { All = 10 },
};
_connection = _connectionFactory.CreateConnection();
_connection.ConnectionInterruptedListener += () =>
LogManager.GetCurrentClassLogger().Debug("Connection interrupted");
_connection.ConnectionResumedListener += () =>
LogManager.GetCurrentClassLogger().Debug("Connection resumed");
_connection.ExceptionListener += ex =>
LogManager.GetCurrentClassLogger().ErrorFormat("Connection exception: '{0}'",
ex.ToString());
_connection.Start();
DeleteQueue(InQueue);
DeleteQueue(OutQueue);
}
[TearDown]
public void TestTeardown()
{
StartService(ActiveMqMaster);
StopService(ActiveMqSlave);
}
private const string ConnectionString =
@"failover:(tcp://localhost:61616)";
protected ServiceController ActiveMqMaster = new
ServiceController(@"ActiveMQ");
protected ServiceController ActiveMqSlave = new
ServiceController(@"ActiveMQSlave");
private IConnection _connection;
private const string InQueue = "in-q";
private const string OutQueue = "out-q";
private static readonly ILog Log =
LogManager.GetLogger(typeof(BrokerRestartAndFailover).Name);
private NetTxConnectionFactory _connectionFactory;
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)