Author: tabish
Date: Wed Nov 3 20:59:54 2010
New Revision: 1030680
URL: http://svn.apache.org/viewvc?rev=1030680&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQNET-290
Creates an .NET System.Transactions Session that will join in on the current
Ambient Transaction whenever it detects one.
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
(with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
(with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
(with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1030680&r1=1030679&r2=1030680&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Wed Nov 3 20:59:54 2010
@@ -504,6 +504,7 @@ namespace Apache.NMS.ActiveMQ
ack =
MakeAckForAllDeliveredMessages(AckType.DeliveredAck);
if(ack != null)
{
+ Tracer.Debug("Consumer - DeliverAcks clearing the
Dispatch list");
this.dispatchedMessages.Clear();
}
else
@@ -853,6 +854,8 @@ namespace Apache.NMS.ActiveMQ
if(!synchronizationRegistered)
{
+ Tracer.DebugFormat("Consumer {0} Registering new
MessageConsumerSynchronization",
+ this.info.ConsumerId);
this.synchronizationRegistered = true;
this.session.TransactionContext.AddSynchronization(new
MessageConsumerSynchronization(this));
}
@@ -963,6 +966,8 @@ namespace Apache.NMS.ActiveMQ
{
if(this.dispatchedMessages.Count == 0)
{
+ Tracer.DebugFormat("Consumer {0} Rolled Back, no
dispatched Messages",
+ this.info.ConsumerId);
return;
}
@@ -1020,9 +1025,15 @@ namespace Apache.NMS.ActiveMQ
// stop the delivery of
messages.
this.unconsumedMessages.Stop();
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.DebugFormat("Consumer {0} Rolled Back,
Re-enque {1} messages",
+ this.info.ConsumerId,
this.dispatchedMessages.Count);
+ }
+
foreach(MessageDispatch
dispatch in this.dispatchedMessages)
{
-
this.unconsumedMessages.EnqueueFirst(dispatch);
+ this.unconsumedMessages.EnqueueFirst(dispatch);
}
if(redeliveryDelay > 0 &&
!this.unconsumedMessages.Closed)
@@ -1155,18 +1166,24 @@ namespace Apache.NMS.ActiveMQ
public void BeforeEnd()
{
+ Tracer.DebugFormat("MessageConsumerSynchronization - BeforeEnd
Called for Consumer {0}.",
+ this.consumer.ConsumerId);
this.consumer.Acknowledge();
this.consumer.synchronizationRegistered = false;
}
public void AfterCommit()
{
+ Tracer.DebugFormat("MessageConsumerSynchronization -
AfterCommit Called for Consumer {0}.",
+ this.consumer.ConsumerId);
this.consumer.Commit();
this.consumer.synchronizationRegistered = false;
}
public void AfterRollback()
{
+ Tracer.DebugFormat("MessageConsumerSynchronization -
AfterRollback Called for Consumer {0}.",
+ this.consumer.ConsumerId);
this.consumer.Rollback();
this.consumer.synchronizationRegistered = false;
}
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs?rev=1030680&view=auto
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
(added)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
Wed Nov 3 20:59:54 2010
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS;
+using Apache.NMS.ActiveMQ.Transport;
+using Apache.NMS.ActiveMQ.Util;
+
+namespace Apache.NMS.ActiveMQ
+{
+ public class NetTxConnection : Connection, INetTxConnection
+ {
+ public NetTxConnection(Uri connectionUri, ITransport transport,
IdGenerator clientIdGenerator)
+ : base(connectionUri, transport, clientIdGenerator)
+ {
+ }
+
+ public INetTxSession CreateNetTxSession()
+ {
+ return (INetTxSession)
CreateSession(AcknowledgementMode.Transactional);
+ }
+
+ protected override Session CreateAtiveMQSession(AcknowledgementMode
ackMode)
+ {
+ CheckConnected();
+ return new NetTxSession(this, NextSessionId);
+ }
+
+ }
+}
+
Propchange:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnection.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs?rev=1030680&view=auto
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
(added)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
Wed Nov 3 20:59:54 2010
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS;
+using Apache.NMS.Util;
+using Apache.NMS.ActiveMQ.Transport;
+
+namespace Apache.NMS.ActiveMQ
+{
+ public class NetTxConnectionFactory : ConnectionFactory,
INetTxConnectionFactory
+ {
+ public NetTxConnectionFactory() : base(GetDefaultBrokerUrl())
+ {
+ }
+
+ public NetTxConnectionFactory(string brokerUri) : base(brokerUri, null)
+ {
+ }
+
+ public NetTxConnectionFactory(string brokerUri, string clientID)
+ : base(brokerUri, clientID)
+ {
+ }
+
+ public NetTxConnectionFactory(Uri brokerUri)
+ : base(brokerUri, null)
+ {
+ }
+
+ public NetTxConnectionFactory(Uri brokerUri, string clientID)
+ : base(brokerUri, clientID)
+ {
+ }
+
+ public INetTxConnection CreateNetTxConnection()
+ {
+ return (INetTxConnection) base.CreateActiveMQConnection();
+ }
+
+ public INetTxConnection CreateNetTxConnection(string userName, string
password)
+ {
+ return (INetTxConnection) base.CreateActiveMQConnection(userName,
password);
+ }
+
+ protected override Connection CreateActiveMQConnection(ITransport
transport)
+ {
+ return new NetTxConnection(this.BrokerUri, transport,
this.ClientIdGenerator);
+ }
+
+ }
+}
+
Propchange:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxConnectionFactory.cs
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs?rev=1030680&view=auto
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
(added)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
Wed Nov 3 20:59:54 2010
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Transactions;
+using Apache.NMS;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ
+{
+ public class NetTxSession : Session, INetTxSession
+ {
+ public NetTxSession(Connection connection, SessionId id)
+ : base(connection, id, AcknowledgementMode.AutoAcknowledge)
+ {
+ }
+
+ /// <summary>
+ /// Reports Transacted whenever there is an Ambient Transaction or the
internal
+ /// TransactionContext is still involed in a .NET Transaction beyond
the lifetime
+ /// of an ambient transaction (can happen during a scoped transaction
disposing
+ /// without Complete being called and a Rollback is in progress.)
+ /// </summary>
+ public override bool IsTransacted
+ {
+ get { return Transaction.Current != null ||
TransactionContext.InNetTransaction; }
+ }
+
+ public override bool IsAutoAcknowledge
+ {
+ // When not in a .NET Transaction we assume Auto Ack.
+ get { return true; }
+ }
+
+ internal override void DoRollback()
+ {
+ // Only the Transaction Manager can do this when in a .NET
Transaction.
+ throw new TransactionInProgressException("Cannot Rollback() inside
an NetTxSession");
+ }
+
+ internal override void DoCommit()
+ {
+ // Only the Transaction Manager can do this when in a .NET
Transaction.
+ throw new TransactionInProgressException("Cannot Commit() inside
an NetTxSession");
+ }
+
+ internal override void DoStartTransaction()
+ {
+ if(!TransactionContext.InNetTransaction)
+ {
+ if(Transaction.Current != null)
+ {
+ Tracer.Debug("NetTxSession detected Ambient Transaction,
start new TX with broker");
+
+ // Start a new .NET style transaction, this could be
distributed
+ // or it could just be a Local transaction that could
become
+ // distributed later.
+ TransactionContext.Begin(Transaction.Current);
+ }
+ }
+ }
+
+ }
+}
+
Propchange:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/NetTxSession.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=1030680&r1=1030679&r2=1030680&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
Wed Nov 3 20:59:54 2010
@@ -71,11 +71,7 @@ namespace Apache.NMS.ActiveMQ
this.acknowledgementMode = acknowledgementMode;
this.requestTimeout = connection.RequestTimeout;
this.dispatchAsync = connection.DispatchAsync;
-
- if(acknowledgementMode ==
AcknowledgementMode.Transactional)
- {
- this.transactionContext = new
TransactionContext(this);
- }
+ this.transactionContext = new TransactionContext(this);
Uri brokerUri = connection.BrokerUri;
@@ -186,39 +182,39 @@ namespace Apache.NMS.ActiveMQ
set { this.requestTimeout = value; }
}
- public bool Transacted
- {
- get { return this.AcknowledgementMode ==
AcknowledgementMode.Transactional; }
- }
+ public bool Transacted
+ {
+ get { return this.IsTransacted; }
+ }
- public AcknowledgementMode AcknowledgementMode
+ public virtual AcknowledgementMode AcknowledgementMode
{
get { return this.acknowledgementMode; }
}
- public bool IsClientAcknowledge
+ public virtual bool IsClientAcknowledge
{
get { return this.acknowledgementMode ==
AcknowledgementMode.ClientAcknowledge; }
}
- public bool IsAutoAcknowledge
+ public virtual bool IsAutoAcknowledge
{
get { return this.acknowledgementMode ==
AcknowledgementMode.AutoAcknowledge; }
}
- public bool IsDupsOkAcknowledge
+ public virtual bool IsDupsOkAcknowledge
{
get { return this.acknowledgementMode ==
AcknowledgementMode.DupsOkAcknowledge; }
}
- public bool IsIndividualAcknowledge
+ public virtual bool IsIndividualAcknowledge
{
get { return this.acknowledgementMode ==
AcknowledgementMode.IndividualAcknowledge; }
}
- public bool IsTransacted
+ public virtual bool IsTransacted
{
- get { return this.acknowledgementMode ==
AcknowledgementMode.Transactional; }
+ get{ return this.acknowledgementMode ==
AcknowledgementMode.Transactional; }
}
public SessionExecutor Executor
@@ -315,25 +311,21 @@ namespace Apache.NMS.ActiveMQ
try
{
- Tracer.InfoFormat("Closing The Session
with Id {0}", this.info.SessionId.ToString());
- DoClose();
- Tracer.InfoFormat("Closed The Session
with Id {0}", this.info.SessionId.ToString());
+ if(transactionContext.InNetTransaction)
+ {
+ this.transactionContext.AddSynchronization(new
SessionCloseSynchronization(this));
+ }
+ else
+ {
+ Tracer.InfoFormat("Closing The Session
with Id {0}", this.info.SessionId.ToString());
+ DoClose();
+ Tracer.InfoFormat("Closed The Session
with Id {0}", this.info.SessionId.ToString());
+ }
}
catch(Exception ex)
{
Tracer.ErrorFormat("Error during
session close: {0}", ex);
}
- finally
- {
- // Make sure we attempt to inform the
broker this Session is done.
- RemoveInfo info = new RemoveInfo();
- info.ObjectId = this.info.SessionId;
- info.LastDeliveredSequenceId =
this.lastDeliveredSequenceId;
- this.connection.Oneway(info);
- this.connection = null;
- this.closed = true;
- this.closing = false;
- }
}
}
@@ -393,8 +385,14 @@ namespace Apache.NMS.ActiveMQ
}
finally
{
- this.closed = true;
- this.closing = false;
+ // Make sure we attempt to inform the broker this Session
is done.
+ RemoveInfo info = new RemoveInfo();
+ info.ObjectId = this.info.SessionId;
+ info.LastDeliveredSequenceId =
this.lastDeliveredSequenceId;
+ this.connection.Oneway(info);
+ this.connection = null;
+ this.closed = true;
+ this.closing = false;
}
}
}
@@ -657,35 +655,21 @@ namespace Apache.NMS.ActiveMQ
public void Commit()
{
- if(!Transacted)
- {
- throw new InvalidOperationException(
- "You cannot perform a Commit()
on a non-transacted session. Acknowlegement mode is: "
- + this.AcknowledgementMode);
- }
-
- this.TransactionContext.Commit();
+ this.DoCommit();
}
public void Rollback()
{
- if(!Transacted)
- {
- throw new InvalidOperationException(
- "You cannot perform a Commit()
on a non-transacted session. Acknowlegement mode is: "
- + this.AcknowledgementMode);
- }
-
- this.TransactionContext.Rollback();
+ this.DoRollback();
}
#endregion
- public void DoSend( ActiveMQMessage message, MessageProducer
producer, MemoryUsage producerWindow, TimeSpan sendTimeout )
+ internal void DoSend( ActiveMQMessage message, MessageProducer
producer, MemoryUsage producerWindow, TimeSpan sendTimeout )
{
ActiveMQMessage msg = message;
- if(Transacted)
+ if(IsTransacted)
{
DoStartTransaction();
msg.TransactionId =
TransactionContext.TransactionId;
@@ -729,12 +713,36 @@ namespace Apache.NMS.ActiveMQ
}
}
+ internal virtual void DoCommit()
+ {
+ if(!IsTransacted)
+ {
+ throw new InvalidOperationException(
+ "You cannot perform a Commit() on a non-transacted
session. Acknowlegement mode is: "
+ + this.AcknowledgementMode);
+ }
+
+ this.TransactionContext.Commit();
+ }
+
+ internal virtual void DoRollback()
+ {
+ if(!IsTransacted)
+ {
+ throw new InvalidOperationException(
+ "You cannot perform a Commit() on a non-transacted
session. Acknowlegement mode is: "
+ + this.AcknowledgementMode);
+ }
+
+ this.TransactionContext.Rollback();
+ }
+
/// <summary>
/// Ensures that a transaction is started
/// </summary>
- public void DoStartTransaction()
+ internal virtual void DoStartTransaction()
{
- if(Transacted)
+ if(IsTransacted)
{
this.TransactionContext.Begin();
}
@@ -929,5 +937,29 @@ namespace Apache.NMS.ActiveMQ
{
}
+ class SessionCloseSynchronization : ISynchronization
+ {
+ private readonly Session session;
+
+ public SessionCloseSynchronization(Session session)
+ {
+ this.session = session;
+ }
+
+ public void BeforeEnd()
+ {
+ }
+
+ public void AfterCommit()
+ {
+ this.session.DoClose();
+ }
+
+ public void AfterRollback()
+ {
+ this.session.DoClose();
+ }
+ }
+
}
}
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs?rev=1030680&r1=1030679&r2=1030680&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/TransactionContext.cs
Wed Nov 3 20:59:54 2010
@@ -14,6 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+using System;
+using System.Text;
+using System.Net;
+using System.Transactions;
using System.Collections;
using Apache.NMS.ActiveMQ.Commands;
@@ -27,15 +32,21 @@ namespace Apache.NMS.ActiveMQ
namespace Apache.NMS.ActiveMQ
{
- public class TransactionContext
+ public class TransactionContext : IEnlistmentNotification
{
+ private const int XA_OK = 0;
+ private const int XA_READONLY = 3;
+
private TransactionId transactionId;
private readonly Session session;
+ private readonly Connection connection;
private readonly ArrayList synchronizations =
ArrayList.Synchronized(new ArrayList());
-
+ private Enlistment currentEnlistment;
+
public TransactionContext(Session session)
{
this.session = session;
+ this.connection = session.Connection;
}
public bool InTransaction
@@ -103,7 +114,6 @@ namespace Apache.NMS.ActiveMQ
this.session.Connection.SyncRequest(info);
this.AfterRollback();
- this.synchronizations.Clear();
}
}
@@ -129,7 +139,6 @@ namespace Apache.NMS.ActiveMQ
this.session.Connection.SyncRequest(info);
this.AfterCommit();
- this.synchronizations.Clear();
}
}
@@ -146,24 +155,293 @@ namespace Apache.NMS.ActiveMQ
internal void AfterCommit()
{
- lock(this.synchronizations.SyncRoot)
+ try
{
- foreach(ISynchronization synchronization in
this.synchronizations)
+ lock(this.synchronizations.SyncRoot)
{
- synchronization.AfterCommit();
+ foreach(ISynchronization synchronization in
this.synchronizations)
+ {
+ synchronization.AfterCommit();
+ }
}
}
+ finally
+ {
+ synchronizations.Clear();
+ }
}
internal void AfterRollback()
{
- lock(this.synchronizations.SyncRoot)
+ try
{
- foreach(ISynchronization synchronization in
this.synchronizations)
+ lock(this.synchronizations.SyncRoot)
+ {
+ foreach(ISynchronization synchronization in
this.synchronizations)
+ {
+ synchronization.AfterRollback();
+ }
+ }
+ }
+ finally
+ {
+ synchronizations.Clear();
+ }
+ }
+
+ #region Transaction Members used when dealing with .NET System
Transactions.
+
+ public bool InNetTransaction
+ {
+ get{ return this.currentEnlistment != null; }
+ }
+
+ public void Begin(Transaction transaction)
+ {
+ Tracer.Debug("Begin notification received");
+
+ if(InNetTransaction)
+ {
+ throw new TransactionInProgressException("A Transaction is
already in Progress");
+ }
+
+ // Enlist this object in the transaction.
+ this.currentEnlistment =
+ transaction.EnlistVolatile(this, EnlistmentOptions.None);
+
+ System.Transactions.TransactionInformation txInfo =
transaction.TransactionInformation;
+
+ XATransactionId xaId = new XATransactionId();
+ this.transactionId = xaId;
+
+ if(txInfo.DistributedIdentifier != Guid.Empty)
+ {
+ xaId.GlobalTransactionId =
txInfo.DistributedIdentifier.ToByteArray();
+ xaId.BranchQualifier =
Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
+ }
+ else
+ {
+ xaId.GlobalTransactionId =
Encoding.UTF8.GetBytes(txInfo.LocalIdentifier);
+ xaId.BranchQualifier =
Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
+ }
+
+ // Now notify the broker that a new XA'ish transaction has started.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.session.Connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int) TransactionType.Begin;
+
+ this.session.Connection.Oneway(info);
+
+ if(Tracer.IsDebugEnabled)
+ {
+ Tracer.Debug("Begin XA'ish Transaction:" +
xaId.GlobalTransactionId.ToString());
+ }
+ }
+
+ public void Prepare(PreparingEnlistment preparingEnlistment)
+ {
+ Tracer.Debug("Prepare notification received");
+
+ // Now notify the broker that a new XA'ish transaction has started.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.session.Connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+
+ try
+ {
+ BeforeEnd();
+
+ // End the current branch
+ info.Type = (int) TransactionType.End;
+ this.connection.SyncRequest(info);
+
+ // Prepare the Transaction for commit.
+ info.Type = (int) TransactionType.Prepare;
+ IntegerResponse response = (IntegerResponse)
this.connection.SyncRequest(info);
+ if(response.Result == XA_READONLY)
+ {
+ Tracer.Debug("Transaction Prepare Reports Done: ");
+
+ // if server responds that nothing needs to be done, then
reply prepared
+ // but clear the current state data so we appear done to
the commit method.
+ preparingEnlistment.Prepared();
+
+ this.transactionId = null;
+ this.currentEnlistment = null;
+
+ // Done so commit won't be called.
+ AfterCommit();
+ }
+ else
+ {
+ Tracer.Debug("Transaction Prepare finished Successfully:
");
+
+ // If work finished correctly, reply prepared
+ preparingEnlistment.Prepared();
+ }
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug("Transaction Prepare failed with error: " +
ex.Message);
+ AfterRollback();
+ preparingEnlistment.ForceRollback();
+ }
+ }
+
+ public void Commit(Enlistment enlistment)
+ {
+ Tracer.Debug("Commit notification received");
+
+ try
+ {
+ if(this.transactionId != null)
{
- synchronization.AfterRollback();
+ // Now notify the broker that a new XA'ish transaction has
started.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.session.Connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int) TransactionType.CommitOnePhase;
+
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
+
+ Tracer.Debug("Transaction Commit Reports Done: ");
+
+ // if server responds that nothing needs to be done, then
reply done.
+ enlistment.Done();
+
+ AfterCommit();
}
}
- }
+ catch(Exception ex)
+ {
+ Tracer.Debug("Transaction Commit failed with error: " +
ex.Message);
+ AfterRollback();
+ enlistment.Done();
+ throw;
+ }
+ finally
+ {
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ }
+ }
+
+ public void Rollback(Enlistment enlistment)
+ {
+ Tracer.Debug("Rollback notification received");
+
+ // Now notify the broker that a new XA'ish transaction has started.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.session.Connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+
+ try
+ {
+ BeforeEnd();
+
+ info.Type = (int) TransactionType.End;
+ this.connection.SyncRequest(info);
+
+ info.Type = (int) TransactionType.Rollback;
+ this.connection.CheckConnected();
+ this.connection.SyncRequest(info);
+
+ Tracer.Debug("Transaction Rollback Reports Done: ");
+
+ // if server responds that nothing needs to be done, then
reply done.
+ enlistment.Done();
+
+ AfterRollback();
+ }
+ catch(Exception ex)
+ {
+ Tracer.Debug("Transaction Rollback failed with error: " +
ex.Message);
+ AfterRollback();
+ enlistment.Done();
+ throw;
+ }
+ finally
+ {
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ }
+ }
+
+ public void InDoubt(Enlistment enlistment)
+ {
+ Tracer.Debug("In doubt notification received");
+
+ try
+ {
+ // Now notify the broker that it should forget this TX.
+ TransactionInfo info = new TransactionInfo();
+ info.ConnectionId = this.session.Connection.ConnectionId;
+ info.TransactionId = this.transactionId;
+ info.Type = (int) TransactionType.Forget;
+
+ //Declare done on the enlistment
+ enlistment.Done();
+ }
+ finally
+ {
+ this.currentEnlistment = null;
+ this.transactionId = null;
+ }
+ }
+
+ #endregion
+
+ private Guid GuidFromId(string id)
+ {
+ // Remove the ID: prefix, that's non-unique to be sure
+ string resId = id.TrimStart("ID:".ToCharArray());
+
+ // Remaing parts should be host-port-timestamp-instance:sequence
+ string[] parts = resId.Split(":-".ToCharArray());
+
+ // We don't use the hostname here, just the remaining bits.
+ int a = Int32.Parse(parts[1]);
+ short b = Int16.Parse(parts[3]);
+ short c = Int16.Parse(parts[4]);
+ byte[] d = System.BitConverter.GetBytes(Int64.Parse(parts[2]));
+
+ return new Guid(a, b, c, d);
+ }
+
+ private string IdFromGuid(Guid guid)
+ {
+ byte[] bytes = guid.ToByteArray();
+
+ int port = System.BitConverter.ToInt32(bytes, 0);
+ int instance = System.BitConverter.ToInt16(bytes, 4);
+ int sequence = System.BitConverter.ToInt16(bytes, 6);
+ long timestamp = System.BitConverter.ToInt64(bytes, 8);
+
+ StringBuilder builder = new StringBuilder("ID:");
+
+ string hostname = "localhost";
+
+ try
+ {
+ hostname = Dns.GetHostName();
+ }
+ catch
+ {
+ }
+
+ builder.Append(hostname);
+ builder.Append("-");
+ builder.Append(port);
+ builder.Append("-");
+ builder.Append(timestamp);
+ builder.Append("-");
+ builder.Append(instance);
+ builder.Append(":");
+ builder.Append(sequence);
+
+ return builder.ToString();
+ }
}
}