Remo Gloor created AMQNET-413:
---------------------------------
Summary: Message producers do not respect DTC Transactions
correctly
Key: AMQNET-413
URL: https://issues.apache.org/jira/browse/AMQNET-413
Project: ActiveMQ .Net
Issue Type: Bug
Reporter: Remo Gloor
Assignee: Jim Gomes
Priority: Critical
When consuming messages in a transaction and sending new ones during processing
of that message and the transaction is rolled back and commited on retry the
number of published messages should be equal to the received one.
But the number of sent message is bigger than the number of received ones. This
means some of the message sends are not rolled back others are.
The following application demonstrates the problem when enqueuing 100+ messages
to foo.bar
class Program
{
private static INetTxSession activeMqSession;
private static IMessageConsumer consumer;
private static INetTxConnection connection;
static void Main(string[] args)
{
using (connection = CreateActiveMqConnection())
using (activeMqSession = connection.CreateNetTxSession())
using (consumer =
activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession,
"queue://foo.bar")))
{
connection.Start();
while (true)
{
try
{
using (TransactionScope scoped = new
TransactionScope(TransactionScopeOption.RequiresNew))
{
IMessage msg = null;
while (msg == null)
{
msg = consumer.ReceiveNoWait();
}
OnMessage(msg);
scoped.Complete();
}
}
catch(Exception exception) {}
}
}
}
private static INetTxConnection CreateActiveMqConnection()
{
var connectionFactory = new
Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
{
AcknowledgementMode = AcknowledgementMode.Transactional
};
return connectionFactory.CreateNetTxConnection();
}
private static void OnMessage(IMessage message)
{
var x = new TestSinglePhaseCommit();
Console.WriteLine("Processing message {0} in transaction {1} -
{2}", message.NMSMessageId,
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);
var session2 = activeMqSession;
{
Transaction.Current.EnlistDurable(Guid.NewGuid(), x,
EnlistmentOptions.None);
using (var producer =
session2.CreateProducer(SessionUtil.GetQueue(session2, "queue://foo.baz")))
{
producer.Send(new ActiveMQTextMessage("foo"));
}
if (!message.NMSRedelivered) throw new Exception();
}
}
}
internal class TestSinglePhaseCommit : ISinglePhaseNotification
{
public void Prepare(PreparingEnlistment preparingEnlistment)
{
preparingEnlistment.Prepared();
}
public void Commit(Enlistment enlistment)
{
enlistment.Done();
}
public void Rollback(Enlistment enlistment)
{
enlistment.Done();
}
public void InDoubt(Enlistment enlistment)
{
enlistment.Done();
}
public void SinglePhaseCommit(SinglePhaseEnlistment
singlePhaseEnlistment)
{
singlePhaseEnlistment.Committed();
}
}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira