Remo Gloor created AMQNET-405:
---------------------------------

             Summary: Messages are not dequeued when using Two Phase Commit for 
DTC Transaction
                 Key: AMQNET-405
                 URL: https://issues.apache.org/jira/browse/AMQNET-405
             Project: ActiveMQ .Net
          Issue Type: Bug
          Components: ActiveMQ
    Affects Versions: 1.5.6
         Environment: Windows 7, AMQ 5.7
            Reporter: Remo Gloor
            Assignee: Jim Gomes
            Priority: Critical


Messages are not removed from the queue even though they are processes 
correctly in the following scenaro:

Create a NetTxConnection and NetTxSession. Create a consumer for some queue and 
register the Listener event. Create a new TransactionScope in the event 
handler. Inside the consumer you send a new message to some other queue using 
the session of the consumer. Do now some action that uses the Distributed 
Transaction from another system (e.g. writing to a database) to force that two 
phase commit is used instead of Single Phase Commit. Complete the and dispose 
the TransactionScope.

I'd expect that that the sent message is delivered to the queue, the database 
is modified and the received message is removed from the input queue. But the 
behavior is that the last expectation is not met. The message remains in the 
input queue.

If you use an own session for sending the messages the problem does not occur. 
But this has the disadvantage the you will always do two phase commits even if 
AMQ is the only system that takes part in the Distrubuted Transaction.

Code demonstrating the problem:

namespace ConsoleApplication1
{
    using System;
    using System.Transactions;

    using Apache.NMS;
    using Apache.NMS.ActiveMQ.Commands;
    using Apache.NMS.Util;

    class Program
    {
        private static INetTxSession activeMqSession;

        private static IMessageConsumer consumer;

        private static INetTxConnection connection;

        static void Main(string[] args)
        {
            using (connection = CreateActiveMqConnection())
            using (activeMqSession = connection.CreateNetTxSession())
            using (consumer = 
activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession, 
"queue://foo.bar")))
            {
                consumer.Listener += OnMessage;
                connection.Start();

                Console.WriteLine("Started");
                Console.WriteLine("Press any key to exit");
                Console.ReadKey();
            }
        }

        private static INetTxConnection CreateActiveMqConnection()
        {
            var connectionFactory = new 
Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
                {
                    AcknowledgementMode = AcknowledgementMode.Transactional
                };

            return connectionFactory.CreateNetTxConnection();
        }

        private static void OnMessage(IMessage message)
        {
            var x = new TestSinglePhaseCommit();
            using (var session2 = connection.CreateNetTxSession())
            using (var tx = new 
TransactionScope(TransactionScopeOption.RequiresNew))
            {
                Console.WriteLine("Message Received");

                // Force two phase commit, In reality this happens by using 
another system that takes part in the 
                // distributed transaction like a database.
                Transaction.Current.EnlistDurable(Guid.NewGuid(), x, 
EnlistmentOptions.None);

                // The proble occurs only if a message is sent using the same 
session like the receiver
                using (var producer = 
session2.CreateProducer(SessionUtil.GetQueue(session2, "queue://foo.baz")))
                {
                    producer.Send(new ActiveMQTextMessage("foo"));
                }

                tx.Complete();
            }
        }
    }

    internal class TestSinglePhaseCommit : ISinglePhaseNotification
    {
        public void Prepare(PreparingEnlistment preparingEnlistment)
        {
            preparingEnlistment.Prepared();
        }

        public void Commit(Enlistment enlistment)
        {
            enlistment.Done();
        }

        public void Rollback(Enlistment enlistment)
        {
            enlistment.Done();
        }

        public void InDoubt(Enlistment enlistment)
        {
            enlistment.Done();
        }

        public void SinglePhaseCommit(SinglePhaseEnlistment 
singlePhaseEnlistment)
        {
            singlePhaseEnlistment.Committed();
        }
    }
}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to