Author: tabish
Date: Fri Mar 18 20:36:13 2011
New Revision: 1083055

URL: http://svn.apache.org/viewvc?rev=1083055&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQNET-321

Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs?rev=1083055&r1=1083054&r2=1083055&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
 Fri Mar 18 20:36:13 2011
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Transactions;
 using Apache.NMS.ActiveMQ.Transport;
 using Apache.NMS.ActiveMQ.Util;
 
@@ -42,6 +43,13 @@ namespace Apache.NMS.ActiveMQ
             return (INetTxSession) 
CreateSession(AcknowledgementMode.Transactional);
         }
 
+        public INetTxSession CreateNetTxSession(Transaction tx)
+        {
+            NetTxSession session = 
(NetTxSession)CreateSession(AcknowledgementMode.Transactional);
+            session.Enlist(tx);
+            return session;
+        }
+
         protected override Session CreateAtiveMQSession(AcknowledgementMode 
ackMode)
         {
             CheckConnected();

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1083055&r1=1083054&r2=1083055&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
 Fri Mar 18 20:36:13 2011
@@ -17,7 +17,6 @@
 
 using System;
 using System.Transactions;
-using Apache.NMS;
 using Apache.NMS.ActiveMQ.Commands;
 
 namespace Apache.NMS.ActiveMQ
@@ -31,6 +30,23 @@ namespace Apache.NMS.ActiveMQ
         }
 
         /// <summary>
+        /// Manually Enlists in the given Transaction.  This can be used to 
when the
+        /// client is using the Session in Asynchronous listener mode since 
the Session
+        /// cannot atuomatically join in this case as there is no Ambient 
transaction in
+        /// the Message Dispatch thread.  This also allows for clients to use 
the explicit
+        /// exception model when necessary.  
+        /// </summary>
+        public void Enlist(Transaction tx)
+        {
+            if(tx == null)
+            {
+                throw new NullReferenceException("Specified Transaction cannot 
be null");
+            }
+
+            this.EnrollInSpecifiedTransaction(tx);
+        }
+
+        /// <summary>
         /// Reports Transacted whenever there is an Ambient Transaction or the 
internal
         /// TransactionContext is still involed in a .NET Transaction beyond 
the lifetime
         /// of an ambient transaction (can happen during a scoped transaction 
disposing
@@ -61,20 +77,38 @@ namespace Apache.NMS.ActiveMQ
 
         internal override void DoStartTransaction()
         {
-            if(!TransactionContext.InNetTransaction)
+            if(!TransactionContext.InNetTransaction && Transaction.Current != 
null)
             {
-                if(Transaction.Current != null)
-                {
-                    Tracer.Debug("NetTxSession detected Ambient Transaction, 
start new TX with broker");
-
-                    // Start a new .NET style transaction, this could be 
distributed
-                    // or it could just be a Local transaction that could 
become
-                    // distributed later.
-                    TransactionContext.Begin(Transaction.Current);
-                }
+                Tracer.Debug("NetTxSession detected Ambient Transaction, start 
new TX with broker");
+
+                EnrollInSpecifiedTransaction(Transaction.Current);
             }
         }
 
+        private void EnrollInSpecifiedTransaction(Transaction tx)
+        {
+            // If an Async DTC operation is in progress such as Commit or 
Rollback
+            // we need to let it complete before deciding if the Session is in 
a TX
+            // otherwise we might error out for no reason.
+            
TransactionContext.DtcWaitHandle.WaitOne(TimeSpan.FromMilliseconds(1000), true);
+
+            if(TransactionContext.InNetTransaction)
+            {
+                Tracer.Warn("Enlist attempted while a Net TX was Active.");
+                throw new InvalidOperationException("Session is Already 
enlisted in a Transaction");
+            }
+
+            if(Transaction.Current != null && !Transaction.Current.Equals(tx))
+            {
+                Tracer.Warn("Enlist attempted with a TX that doesn't match the 
Ambient TX.");
+                throw new ArgumentException("Specified TX must match the 
ambient TX if set.");
+            }
+            
+            // Start a new .NET style transaction, this could be distributed
+            // or it could just be a Local transaction that could become
+            // distributed later.
+            TransactionContext.Begin(tx);
+        }
     }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs?rev=1083055&r1=1083054&r2=1083055&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/test/csharp/DtcConsumerTransactionsTest.cs
 Fri Mar 18 20:36:13 2011
@@ -316,22 +316,33 @@ namespace Apache.NMS.ActiveMQ.Test
             INetTxConnectionFactory factory = new 
NetTxConnectionFactory(ReplaceEnvVar(connectionURI));
 
             using (INetTxConnection connection = 
factory.CreateNetTxConnection())
-            using (INetTxSession session = connection.CreateNetTxSession())
+            using (NetTxSession session = connection.CreateNetTxSession() as 
NetTxSession)
             {
                 IQueue queue = session.GetQueue(testQueueName);
                 IMessageConsumer consumer = session.CreateConsumer(queue);
                 consumer.Listener += AsyncTxAwareOnMessage;
 
+                // Be carefull, message are dispatched once this is done, so 
you could receive
+                // a Message outside a TX.  We use the 
awaitBatchProcessingStart event here to
+                // gate te OnMessage callback, once that method returns the 
Message is ack'd and
+                // no longer has a chance to participate in a TX.
                 connection.Start();
 
                 for (int i = 0; i < BATCH_COUNT; ++i)
                 {
                     using (TransactionScope scoped = new 
TransactionScope(TransactionScopeOption.RequiresNew))
                     {
+                        session.Enlist(Transaction.Current);
+
                         batchTxControl = 
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
                         awaitBatchProcessingStart.Set();
                         scoped.Complete();
-                    }                    
+                    }
+
+                    // Reenlisting to fast seems to annoy the DTC.  Also since 
DTC operations are
+                    // async we need to allow a little time for lag so that 
the last TX actually 
+                    // completes before we start a new one.
+                    Thread.Sleep(250);
                 }
             }
 


Reply via email to