Author: tabish
Date: Fri Mar 18 20:36:13 2011
New Revision: 1083055
URL: http://svn.apache.org/viewvc?rev=1083055&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQNET-321
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs?rev=1083055&r1=1083054&r2=1083055&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
Fri Mar 18 20:36:13 2011
@@ -16,6 +16,7 @@
*/
using System;
+using System.Transactions;
using Apache.NMS.ActiveMQ.Transport;
using Apache.NMS.ActiveMQ.Util;
@@ -42,6 +43,13 @@ namespace Apache.NMS.ActiveMQ
return (INetTxSession)
CreateSession(AcknowledgementMode.Transactional);
}
+ public INetTxSession CreateNetTxSession(Transaction tx)
+ {
+ NetTxSession session =
(NetTxSession)CreateSession(AcknowledgementMode.Transactional);
+ session.Enlist(tx);
+ return session;
+ }
+
protected override Session CreateAtiveMQSession(AcknowledgementMode
ackMode)
{
CheckConnected();
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1083055&r1=1083054&r2=1083055&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
Fri Mar 18 20:36:13 2011
@@ -17,7 +17,6 @@
using System;
using System.Transactions;
-using Apache.NMS;
using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ
@@ -31,6 +30,23 @@ namespace Apache.NMS.ActiveMQ
}
/// <summary>
+ /// Manually Enlists in the given Transaction. This can be used to
when the
+ /// client is using the Session in Asynchronous listener mode since
the Session
+ /// cannot atuomatically join in this case as there is no Ambient
transaction in
+ /// the Message Dispatch thread. This also allows for clients to use
the explicit
+ /// exception model when necessary.
+ /// </summary>
+ public void Enlist(Transaction tx)
+ {
+ if(tx == null)
+ {
+ throw new NullReferenceException("Specified Transaction cannot
be null");
+ }
+
+ this.EnrollInSpecifiedTransaction(tx);
+ }
+
+ /// <summary>
/// Reports Transacted whenever there is an Ambient Transaction or the
internal
/// TransactionContext is still involed in a .NET Transaction beyond
the lifetime
/// of an ambient transaction (can happen during a scoped transaction
disposing
@@ -61,20 +77,38 @@ namespace Apache.NMS.ActiveMQ
internal override void DoStartTransaction()
{
- if(!TransactionContext.InNetTransaction)
+ if(!TransactionContext.InNetTransaction && Transaction.Current !=
null)
{
- if(Transaction.Current != null)
- {
- Tracer.Debug("NetTxSession detected Ambient Transaction,
start new TX with broker");
-
- // Start a new .NET style transaction, this could be
distributed
- // or it could just be a Local transaction that could
become
- // distributed later.
- TransactionContext.Begin(Transaction.Current);
- }
+ Tracer.Debug("NetTxSession detected Ambient Transaction, start
new TX with broker");
+
+ EnrollInSpecifiedTransaction(Transaction.Current);
}
}
+ private void EnrollInSpecifiedTransaction(Transaction tx)
+ {
+ // If an Async DTC operation is in progress such as Commit or
Rollback
+ // we need to let it complete before deciding if the Session is in
a TX
+ // otherwise we might error out for no reason.
+
TransactionContext.DtcWaitHandle.WaitOne(TimeSpan.FromMilliseconds(1000), true);
+
+ if(TransactionContext.InNetTransaction)
+ {
+ Tracer.Warn("Enlist attempted while a Net TX was Active.");
+ throw new InvalidOperationException("Session is Already
enlisted in a Transaction");
+ }
+
+ if(Transaction.Current != null && !Transaction.Current.Equals(tx))
+ {
+ Tracer.Warn("Enlist attempted with a TX that doesn't match the
Ambient TX.");
+ throw new ArgumentException("Specified TX must match the
ambient TX if set.");
+ }
+
+ // Start a new .NET style transaction, this could be distributed
+ // or it could just be a Local transaction that could become
+ // distributed later.
+ TransactionContext.Begin(tx);
+ }
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1083055&r1=1083054&r2=1083055&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
Fri Mar 18 20:36:13 2011
@@ -316,22 +316,33 @@ namespace Apache.NMS.ActiveMQ.Test
INetTxConnectionFactory factory = new
NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
using (INetTxConnection connection =
factory.CreateNetTxConnection())
- using (INetTxSession session = connection.CreateNetTxSession())
+ using (NetTxSession session = connection.CreateNetTxSession() as
NetTxSession)
{
IQueue queue = session.GetQueue(testQueueName);
IMessageConsumer consumer = session.CreateConsumer(queue);
consumer.Listener += AsyncTxAwareOnMessage;
+ // Be carefull, message are dispatched once this is done, so
you could receive
+ // a Message outside a TX. We use the
awaitBatchProcessingStart event here to
+ // gate te OnMessage callback, once that method returns the
Message is ack'd and
+ // no longer has a chance to participate in a TX.
connection.Start();
for (int i = 0; i < BATCH_COUNT; ++i)
{
using (TransactionScope scoped = new
TransactionScope(TransactionScopeOption.RequiresNew))
{
+ session.Enlist(Transaction.Current);
+
batchTxControl =
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
awaitBatchProcessingStart.Set();
scoped.Complete();
- }
+ }
+
+ // Reenlisting to fast seems to annoy the DTC. Also since
DTC operations are
+ // async we need to allow a little time for lag so that
the last TX actually
+ // completes before we start a new one.
+ Thread.Sleep(250);
}
}