Author: tabish
Date: Mon May 20 20:18:32 2013
New Revision: 1484579
URL: http://svn.apache.org/r1484579
Log:
fix for: https://issues.apache.org/jira/browse/AMQNET-412
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1484579&r1=1484578&r2=1484579&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
Mon May 20 20:18:32 2013
@@ -25,6 +25,7 @@ namespace Apache.NMS.ActiveMQ
public sealed class NetTxSession : Session, INetTxSession
{
private readonly NetTxTransactionContext transactionContext;
+ private string currentTransactionId;
public NetTxSession(Connection connection, SessionId id)
: base(connection, id, AcknowledgementMode.AutoAcknowledge)
@@ -128,8 +129,15 @@ namespace Apache.NMS.ActiveMQ
{
lock (transactionContext.SyncRoot)
{
- if (transactionContext.InNetTransaction &&
transactionContext.NetTxState == NetTxTransactionContext.TxState.Pending)
+ while (transactionContext.InNetTransaction &&
+ (transactionContext.NetTxState ==
NetTxTransactionContext.TxState.Pending ||
+ (Transaction.Current != null &&
+ this.currentTransactionId !=
Transaction.Current.TransactionInformation.LocalIdentifier)))
{
+ if (Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("NetTxSession awaiting completion
of TX:{0}", transactionContext.TransactionId);
+ }
// To late to participate in this TX, we have to wait for
it to complete then
// we can create a new TX and start from there.
Monitor.Exit(transactionContext.SyncRoot);
@@ -162,6 +170,7 @@ namespace Apache.NMS.ActiveMQ
// Start a new .NET style transaction, this could be distributed
// or it could just be a Local transaction that could become
// distributed later.
+ this.currentTransactionId =
tx.TransactionInformation.LocalIdentifier;
transactionContext.Begin(tx);
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs?rev=1484579&r1=1484578&r2=1484579&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxTransactionContext.cs
Mon May 20 20:18:32 2013
@@ -102,7 +102,6 @@ namespace Apache.NMS.ActiveMQ
{
lock (syncObject)
{
- this.netTxState = TxState.Active;
dtcControlEvent.Reset();
Tracer.Debug("Begin notification received");
@@ -120,6 +119,10 @@ namespace Apache.NMS.ActiveMQ
this.currentEnlistment =
transaction.EnlistDurable(rmId, this,
EnlistmentOptions.None);
+ // In case of a exception in the current method the
transaction will be rolled back.
+ // Until Begin Transaction is completed we consider to be
in a rollback scenario.
+ this.netTxState = TxState.Pending;
+
Tracer.Debug("Enlisted in Durable Transaction with RM Id:
" + rmId);
TransactionInformation txInfo =
transaction.TransactionInformation;
@@ -146,6 +149,9 @@ namespace Apache.NMS.ActiveMQ
this.session.Connection.Oneway(info);
+ // Begin Transaction is completed successfully. Change to
transaction active state now.
+ this.netTxState = TxState.Active;
+
SignalTransactionStarted();
if (Tracer.IsDebugEnabled)
@@ -155,7 +161,12 @@ namespace Apache.NMS.ActiveMQ
}
catch (Exception)
{
- dtcControlEvent.Set();
+ // When in pending state the rollback will signal that a
new transaction can be started. Otherwise do it here.
+ if (netTxState != TxState.Pending)
+ {
+ netTxState = TxState.None;
+ dtcControlEvent.Set();
+ }
throw;
}
}
@@ -201,9 +212,11 @@ namespace Apache.NMS.ActiveMQ
// change on the broker.
RecoveryLogger.LogRecovered(this.transactionId as
XATransactionId);
- // if server responds that nothing needs to be done,
then reply prepared
- // but clear the current state data so we appear done
to the commit method.
- preparingEnlistment.Prepared();
+ // if server responds that nothing needs to be done,
then reply done.
+ // otherwise the DTC will call Commit or Rollback but
another transaction
+ // can already be in progress and this one would be
commited or rolled back
+ // immediately.
+ preparingEnlistment.Done();
// Done so commit won't be called.
AfterCommit();
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1484579&r1=1484578&r2=1484579&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
Mon May 20 20:18:32 2013
@@ -571,6 +571,86 @@ namespace Apache.NMS.ActiveMQ.Test
VerifyBrokerQueueCount();
}
+ [Test]
+ public void MessageShouldEnlistToTheCorrectTransaction()
+ {
+ const int messageCount = 100;
+ const int receiveCount = 100;
+
+ // enqueue several messages
+ PurgeDatabase();
+ PurgeAndFillQueue(messageCount);
+
+ var enlistment = new TestSinglePhaseCommit();
+
+ using (INetTxConnection connection =
factory.CreateNetTxConnection())
+ {
+ connection.Start();
+
+ // receive half of total messages
+ using (INetTxSession session = connection.CreateNetTxSession())
+ {
+ IQueue queue = session.GetQueue(testQueueName);
+ using (IMessageConsumer consumer =
session.CreateConsumer(queue))
+ {
+ for (int i = 0; i < receiveCount; i++)
+ {
+ try
+ {
+ using (TransactionScope scoped = new
TransactionScope(TransactionScopeOption.RequiresNew))
+ {
+ ITextMessage message =
consumer.Receive(TimeSpan.FromMilliseconds(10000)) as ITextMessage;
+
+
Transaction.Current.EnlistDurable(Guid.NewGuid(), enlistment,
EnlistmentOptions.None);
+ if (new Random().Next(2) == 0)
+ {
+ Tracer.InfoFormat("Throwing random
Exception for Message {0}", message.NMSMessageId);
+ throw new Exception();
+ }
+
+ scoped.Complete();
+ }
+ }
+ catch
+ {
+ }
+
+ Assert.False(enlistment.singlePhaseCommit, "No
single phase commit should happen.");
+ }
+ }
+ }
+ }
+ }
+
+ internal class TestSinglePhaseCommit : ISinglePhaseNotification
+ {
+ public bool singlePhaseCommit = false;
+
+ public void Prepare(PreparingEnlistment preparingEnlistment)
+ {
+ preparingEnlistment.Prepared();
+ }
+ public void Commit(Enlistment enlistment)
+ {
+ enlistment.Done();
+ }
+
+ public void Rollback(Enlistment enlistment)
+ {
+ enlistment.Done();
+ }
+ public void InDoubt(Enlistment enlistment)
+ {
+ enlistment.Done();
+ }
+ public void SinglePhaseCommit(SinglePhaseEnlistment
singlePhaseEnlistment)
+ {
+ Tracer.Info("Performing invalid single phase commit.");
+ singlePhaseCommit = true;
+ singlePhaseEnlistment.Committed();
+ }
+ }
+
#region Asynchronous Consumer Inside of a Transaction Test / Example
private const int BATCH_COUNT = 5;