[
https://issues.apache.org/jira/browse/AMQNET-413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13814946#comment-13814946
]
Timothy Bish commented on AMQNET-413:
-------------------------------------
To many changes in non-DTC code to include in a patch release, code must work
in .NET 2.0 or be ifdef protected, DTC code should have as little mix with
non-DTC code as possible.
> Message producers do not respect DTC Transactions correctly
> -----------------------------------------------------------
>
> Key: AMQNET-413
> URL: https://issues.apache.org/jira/browse/AMQNET-413
> Project: ActiveMQ .Net
> Issue Type: Bug
> Components: ActiveMQ
> Reporter: Remo Gloor
> Assignee: Jim Gomes
> Attachments: AMQNET-413.patch,
> AllMessagesAreAcknowledgedAndRolledbackIndependentOfTheTransaction.patch,
> AllMessagesAreAcknowledgedAndRolledbackIndependentOfTheTransaction.patch,
> allDTCImprovments.patch
>
>
> When consuming messages in a transaction and sending new ones during
> processing of that message and the transaction is rolled back and commited on
> retry the number of published messages should be equal to the received one.
> But the number of sent message is bigger than the number of received ones.
> This means some of the message sends are not rolled back others are.
> EDIT: Further analysis have shown that the TransactionContext.TransactionId
> is null when sending eventhough a transaction is in progress and not yet
> completed. It must incorrectly be assigned to null somewhere.
> The following application demonstrates the problem when enqueuing 100+
> messages to foo.bar
> class Program
> {
> private static INetTxSession activeMqSession;
> private static IMessageConsumer consumer;
> private static INetTxConnection connection;
> static void Main(string[] args)
> {
> using (connection = CreateActiveMqConnection())
> using (activeMqSession = connection.CreateNetTxSession())
> using (consumer =
> activeMqSession.CreateConsumer(SessionUtil.GetQueue(activeMqSession,
> "queue://foo.bar")))
> {
> connection.Start();
> while (true)
> {
> try
> {
> using (TransactionScope scoped = new
> TransactionScope(TransactionScopeOption.RequiresNew))
> {
> IMessage msg = null;
> while (msg == null)
> {
> msg = consumer.ReceiveNoWait();
> }
> OnMessage(msg);
> scoped.Complete();
> }
> }
> catch(Exception exception) {}
> }
> }
> }
> private static INetTxConnection CreateActiveMqConnection()
> {
> var connectionFactory = new
> Apache.NMS.ActiveMQ.NetTxConnectionFactory("activemq:tcp://localhost:61616")
> {
> AcknowledgementMode = AcknowledgementMode.Transactional
> };
> return connectionFactory.CreateNetTxConnection();
> }
> private static void OnMessage(IMessage message)
> {
> var x = new TestSinglePhaseCommit();
> Console.WriteLine("Processing message {0} in transaction {1} -
> {2}", message.NMSMessageId,
> Transaction.Current.TransactionInformation.LocalIdentifier,
> Transaction.Current.TransactionInformation.DistributedIdentifier);
> var session2 = activeMqSession;
> {
> Transaction.Current.EnlistDurable(Guid.NewGuid(), x,
> EnlistmentOptions.None);
> using (var producer =
> session2.CreateProducer(SessionUtil.GetQueue(session2, "queue://foo.baz")))
> {
> producer.Send(new ActiveMQTextMessage("foo"));
> }
> if (!message.NMSRedelivered) throw new Exception();
> }
> }
> }
> internal class TestSinglePhaseCommit : ISinglePhaseNotification
> {
> public void Prepare(PreparingEnlistment preparingEnlistment)
> {
> preparingEnlistment.Prepared();
> }
> public void Commit(Enlistment enlistment)
> {
> enlistment.Done();
> }
> public void Rollback(Enlistment enlistment)
> {
> enlistment.Done();
> }
> public void InDoubt(Enlistment enlistment)
> {
> enlistment.Done();
> }
> public void SinglePhaseCommit(SinglePhaseEnlistment
> singlePhaseEnlistment)
> {
> singlePhaseEnlistment.Committed();
> }
> }
--
This message was sent by Atlassian JIRA
(v6.1#6144)