Remo Gloor created AMQNET-405:
---------------------------------
Summary: Messages are not dequeued when using Two Phase Commit for
DTC Transaction
Key: AMQNET-405
URL: https://issues.apache.org/jira/browse/AMQNET-405
Project: ActiveMQ .Net
Issue Type: Bug
Components: ActiveMQ
Affects Versions: 1.5.6
Environment: Windows 7, AMQ 5.7
Reporter: Remo Gloor
Assignee: Jim Gomes
Priority: Critical
Messages are not removed from the queue even though they are processes
correctly in the following scenaro:
Create a NetTxConnection and NetTxSession. Create a consumer for some queue and
register the Listener event. Create a new TransactionScope in the event
handler. Inside the consumer you send a new message to some other queue using
the session of the consumer. Do now some action that uses the Distributed
Transaction from another system (e.g. writing to a database) to force that two
phase commit is used instead of Single Phase Commit. Complete the and dispose
the TransactionScope.
I'd expect that that the sent message is delivered to the queue, the database
is modified and the received message is removed from the input queue. But the
behavior is that the last expectation is not met. The message remains in the
input queue.
If you use an own session for sending the messages the problem does not occur.
But this has the disadvantage the you will always do two phase commits even if
AMQ is the only system that takes part in the Distrubuted Transaction.
Code demonstrating the problem:
namespace ConsoleApplication1
{
using System;
using System.Transactions;
using Apache.NMS;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.Util;
class Program
{
private static INetTxSession activeMqSession;
private static IMessageConsumer consumer;
private static INetTxConnection connection;
static void Main(string[] args)
{
using (connection = CreateActiveMqConnection())
using (activeMqSession = connection.CreateNetTxSession())
using (consumer =
activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession,
"queue://foo.bar")))
{
consumer.Listener += OnMessage;
connection.Start();
Console.WriteLine("Started");
Console.WriteLine("Press any key to exit");
Console.ReadKey();
}
}
private static INetTxConnection CreateActiveMqConnection()
{
var connectionFactory = new
Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
{
AcknowledgementMode = AcknowledgementMode.Transactional
};
return connectionFactory.CreateNetTxConnection();
}
private static void OnMessage(IMessage message)
{
var x = new TestSinglePhaseCommit();
using (var session2 = connection.CreateNetTxSession())
using (var tx = new
TransactionScope(TransactionScopeOption.RequiresNew))
{
Console.WriteLine("Message Received");
// Force two phase commit, In reality this happens by using
another system that takes part in the
// distributed transaction like a database.
Transaction.Current.EnlistDurable(Guid.NewGuid(), x,
EnlistmentOptions.None);
// The proble occurs only if a message is sent using the same
session like the receiver
using (var producer =
session2.CreateProducer(SessionUtil.GetQueue(session2, "queue://foo.baz")))
{
producer.Send(new ActiveMQTextMessage("foo"));
}
tx.Complete();
}
}
}
internal class TestSinglePhaseCommit : ISinglePhaseNotification
{
public void Prepare(PreparingEnlistment preparingEnlistment)
{
preparingEnlistment.Prepared();
}
public void Commit(Enlistment enlistment)
{
enlistment.Done();
}
public void Rollback(Enlistment enlistment)
{
enlistment.Done();
}
public void InDoubt(Enlistment enlistment)
{
enlistment.Done();
}
public void SinglePhaseCommit(SinglePhaseEnlistment
singlePhaseEnlistment)
{
singlePhaseEnlistment.Committed();
}
}
}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira