fix for: https://issues.apache.org/activemq/browse/AMQNET-271


Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/commit/48f3e707
Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/tree/48f3e707
Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/diff/48f3e707

Branch: refs/heads/1.4.x
Commit: 48f3e707949d41bb18b37a675575b2a823ba176f
Parents: be02596
Author: Timothy A. Bish <[email protected]>
Authored: Sun Aug 29 18:44:38 2010 +0000
Committer: Timothy A. Bish <[email protected]>
Committed: Sun Aug 29 18:44:38 2010 +0000

----------------------------------------------------------------------
 trunk/src/main/csharp/Connection.cs        | 402 +++++++++---------
 trunk/src/main/csharp/ConnectionFactory.cs | 212 +++++-----
 trunk/src/main/csharp/MessageConsumer.cs   | 433 ++++++++++----------
 trunk/src/main/csharp/MessageProducer.cs   | 524 ++++++++++++------------
 trunk/src/main/csharp/Session.cs           | 464 +++++++++++----------
 5 files changed, 1058 insertions(+), 977 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/Connection.cs
----------------------------------------------------------------------
diff --git a/trunk/src/main/csharp/Connection.cs 
b/trunk/src/main/csharp/Connection.cs
index 096e41f..13dda20 100644
--- a/trunk/src/main/csharp/Connection.cs
+++ b/trunk/src/main/csharp/Connection.cs
@@ -19,198 +19,212 @@ using System;
 
 namespace Apache.NMS.MSMQ
 {
-       /// <summary>
-       /// Represents a NMS connection MSMQ.  Since the underlying MSMQ APIs 
are actually
-       /// connectionless, NMS connection in the MSMQ case are not expensive 
operations.
-       /// </summary>
-       ///
-       public class Connection : IConnection
-       {
-               private AcknowledgementMode acknowledgementMode = 
AcknowledgementMode.AutoAcknowledge;
-               private IMessageConverter messageConverter = new 
DefaultMessageConverter();
-
-               private IRedeliveryPolicy redeliveryPolicy;
-               private ConnectionMetaData metaData = null;
-               private bool connected;
-               private bool closed;
-               private string clientId;
-
-               /// <summary>
-               /// Starts message delivery for this connection.
-               /// </summary>
-               public void Start()
-               {
-                       CheckConnected();
-               }
-
-               /// <summary>
-               /// This property determines if the asynchronous message 
delivery of incoming
-               /// messages has been started for this connection.
-               /// </summary>
-               public bool IsStarted
-               {
-                       get { return true; }
-               }
-
-               /// <summary>
-               /// Stop message delivery for this connection.
-               /// </summary>
-               public void Stop()
-               {
-                       CheckConnected();
-               }
-
-               /// <summary>
-               /// Creates a new session to work on this connection
-               /// </summary>
-               public ISession CreateSession()
-               {
-                       return CreateSession(acknowledgementMode);
-               }
-
-               /// <summary>
-               /// Creates a new session to work on this connection
-               /// </summary>
-               public ISession CreateSession(AcknowledgementMode mode)
-               {
-                       CheckConnected();
-                       return new Session(this, mode);
-               }
-
-               public void Dispose()
-               {
-                       closed = true;
-               }
-
-               /// <summary>
-               /// The default timeout for network requests.
-               /// </summary>
-               public TimeSpan RequestTimeout
-               {
-                       get { return NMSConstants.defaultRequestTimeout; }
-                       set { }
-               }
-
-               public AcknowledgementMode AcknowledgementMode
-               {
-                       get { return acknowledgementMode; }
-                       set { acknowledgementMode = value; }
-               }
-
-               public IMessageConverter MessageConverter
-               {
-                       get { return messageConverter; }
-                       set { messageConverter = value; }
-               }
-
-               public string ClientId
-               {
-                       get { return clientId; }
-                       set
-                       {
-                               if(connected)
-                               {
-                                       throw new NMSException("You cannot 
change the ClientId once the Connection is connected");
-                               }
-                               clientId = value;
-                       }
-               }
-
-               /// <summary>
-               /// Get/or set the redelivery policy for this connection.
-               /// </summary>
-               public IRedeliveryPolicy RedeliveryPolicy
-               {
-                       get { return this.redeliveryPolicy; }
-                       set { this.redeliveryPolicy = value; }
-               }
-
-               /// <summary>
-               /// Gets the Meta Data for the NMS Connection instance.
-               /// </summary>
-               public IConnectionMetaData MetaData
-               {
-                       get { return this.metaData ?? (this.metaData = new 
ConnectionMetaData()); }
-               }
-
-               /// <summary>
-               /// A delegate that can receive transport level exceptions.
-               /// </summary>
-               public event ExceptionListener ExceptionListener;
-
-               /// <summary>
-               /// An asynchronous listener that is notified when a Fault 
tolerant connection
-               /// has been interrupted.
-               /// </summary>
-               public event ConnectionInterruptedListener 
ConnectionInterruptedListener;
-
-               /// <summary>
-               /// An asynchronous listener that is notified when a Fault 
tolerant connection
-               /// has been resumed.
-               /// </summary>
-               public event ConnectionResumedListener 
ConnectionResumedListener;
-
-               protected void CheckConnected()
-               {
-                       if(closed)
-                       {
-                               throw new NMSException("Connection Closed");
-                       }
-                       if(!connected)
-                       {
-                               connected = true;
-                               // now lets send the connection and see if we 
get an ack/nak
-                               // TODO: establish a connection
-                       }
-               }
-
-               public void Close()
-               {
-                       Dispose();
-               }
-
-               public void HandleException(Exception e)
-               {
-                       if(ExceptionListener != null && !this.closed)
-                       {
-                               ExceptionListener(e);
-                       }
-                       else
-                       {
-                               Tracer.Error(e);
-                       }
-               }
-
-               public void HandleTransportInterrupted()
-               {
-                       Tracer.Debug("Transport has been Interrupted.");
-
-                       if(this.ConnectionInterruptedListener != null && 
!this.closed)
-                       {
-                               try
-                               {
-                                       this.ConnectionInterruptedListener();
-                               }
-                               catch
-                               {
-                               }
-                       }
-               }
-
-               public void HandleTransportResumed()
-               {
-                       Tracer.Debug("Transport has resumed normal operation.");
-
-                       if(this.ConnectionResumedListener != null && 
!this.closed)
-                       {
-                               try
-                               {
-                                       this.ConnectionResumedListener();
-                               }
-                               catch
-                               {
-                               }
-                       }
-               }
-       }
+    /// <summary>
+    /// Represents a NMS connection MSMQ.  Since the underlying MSMQ APIs are 
actually
+    /// connectionless, NMS connection in the MSMQ case are not expensive 
operations.
+    /// </summary>
+    ///
+    public class Connection : IConnection
+    {
+        private AcknowledgementMode acknowledgementMode = 
AcknowledgementMode.AutoAcknowledge;
+        private IMessageConverter messageConverter = new 
DefaultMessageConverter();
+
+        private IRedeliveryPolicy redeliveryPolicy;
+        private ConnectionMetaData metaData = null;
+        private bool connected;
+        private bool closed;
+        private string clientId;
+
+        /// <summary>
+        /// Starts message delivery for this connection.
+        /// </summary>
+        public void Start()
+        {
+            CheckConnected();
+        }
+
+        /// <summary>
+        /// This property determines if the asynchronous message delivery of 
incoming
+        /// messages has been started for this connection.
+        /// </summary>
+        public bool IsStarted
+        {
+            get { return true; }
+        }
+
+        /// <summary>
+        /// Stop message delivery for this connection.
+        /// </summary>
+        public void Stop()
+        {
+            CheckConnected();
+        }
+
+        /// <summary>
+        /// Creates a new session to work on this connection
+        /// </summary>
+        public ISession CreateSession()
+        {
+            return CreateSession(acknowledgementMode);
+        }
+
+        /// <summary>
+        /// Creates a new session to work on this connection
+        /// </summary>
+        public ISession CreateSession(AcknowledgementMode mode)
+        {
+            CheckConnected();
+            return new Session(this, mode);
+        }
+
+        public void Dispose()
+        {
+            closed = true;
+        }
+
+        /// <summary>
+        /// The default timeout for network requests.
+        /// </summary>
+        public TimeSpan RequestTimeout
+        {
+            get { return NMSConstants.defaultRequestTimeout; }
+            set { }
+        }
+
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { return acknowledgementMode; }
+            set { acknowledgementMode = value; }
+        }
+
+        public IMessageConverter MessageConverter
+        {
+            get { return messageConverter; }
+            set { messageConverter = value; }
+        }
+
+        public string ClientId
+        {
+            get { return clientId; }
+            set
+            {
+                if(connected)
+                {
+                    throw new NMSException("You cannot change the ClientId 
once the Connection is connected");
+                }
+                clientId = value;
+            }
+        }
+
+        /// <summary>
+        /// Get/or set the redelivery policy for this connection.
+        /// </summary>
+        public IRedeliveryPolicy RedeliveryPolicy
+        {
+            get { return this.redeliveryPolicy; }
+            set { this.redeliveryPolicy = value; }
+        }
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+        /// <summary>
+        /// Gets the Meta Data for the NMS Connection instance.
+        /// </summary>
+        public IConnectionMetaData MetaData
+        {
+            get { return this.metaData ?? (this.metaData = new 
ConnectionMetaData()); }
+        }
+
+        /// <summary>
+        /// A delegate that can receive transport level exceptions.
+        /// </summary>
+        public event ExceptionListener ExceptionListener;
+
+        /// <summary>
+        /// An asynchronous listener that is notified when a Fault tolerant 
connection
+        /// has been interrupted.
+        /// </summary>
+        public event ConnectionInterruptedListener 
ConnectionInterruptedListener;
+
+        /// <summary>
+        /// An asynchronous listener that is notified when a Fault tolerant 
connection
+        /// has been resumed.
+        /// </summary>
+        public event ConnectionResumedListener ConnectionResumedListener;
+
+        protected void CheckConnected()
+        {
+            if(closed)
+            {
+                throw new NMSException("Connection Closed");
+            }
+            if(!connected)
+            {
+                connected = true;
+                // now lets send the connection and see if we get an ack/nak
+                // TODO: establish a connection
+            }
+        }
+
+        public void Close()
+        {
+            Dispose();
+        }
+
+        public void HandleException(Exception e)
+        {
+            if(ExceptionListener != null && !this.closed)
+            {
+                ExceptionListener(e);
+            }
+            else
+            {
+                Tracer.Error(e);
+            }
+        }
+
+        public void HandleTransportInterrupted()
+        {
+            Tracer.Debug("Transport has been Interrupted.");
+
+            if(this.ConnectionInterruptedListener != null && !this.closed)
+            {
+                try
+                {
+                    this.ConnectionInterruptedListener();
+                }
+                catch
+                {
+                }
+            }
+        }
+
+        public void HandleTransportResumed()
+        {
+            Tracer.Debug("Transport has resumed normal operation.");
+
+            if(this.ConnectionResumedListener != null && !this.closed)
+            {
+                try
+                {
+                    this.ConnectionResumedListener();
+                }
+                catch
+                {
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/ConnectionFactory.cs
----------------------------------------------------------------------
diff --git a/trunk/src/main/csharp/ConnectionFactory.cs 
b/trunk/src/main/csharp/ConnectionFactory.cs
index 24e895d..fe9beee 100644
--- a/trunk/src/main/csharp/ConnectionFactory.cs
+++ b/trunk/src/main/csharp/ConnectionFactory.cs
@@ -19,101 +19,119 @@ using Apache.NMS.Policies;
 
 namespace Apache.NMS.MSMQ
 {
-       /// <summary>
-       /// A Factory that can estbalish NMS connections to MSMQ
-       /// </summary>
-       public class ConnectionFactory : IConnectionFactory
-       {
-               public const string DEFAULT_BROKER_URL = "msmq://localhost";
-               public const string ENV_BROKER_URL = "MSMQ_BROKER_URL";
-               private Uri brokerUri;
-               private IRedeliveryPolicy redeliveryPolicy = new 
RedeliveryPolicy();
-
-               public static string GetDefaultBrokerUrl()
-               {
-                       string answer = 
Environment.GetEnvironmentVariable(ENV_BROKER_URL);
-                       if(answer == null)
-                       {
-                               answer = DEFAULT_BROKER_URL;
-                       }
-                       return answer;
-               }
-
-               public ConnectionFactory()
-                       : this(GetDefaultBrokerUrl())
-               {
-               }
-
-               public ConnectionFactory(string brokerUri)
-                       : this(brokerUri, null)
-               {
-               }
-
-               public ConnectionFactory(string brokerUri, string clientID)
-                       : this(new Uri(brokerUri), clientID)
-               {
-               }
-
-               public ConnectionFactory(Uri brokerUri)
-                       : this(brokerUri, null)
-               {
-               }
-
-               public ConnectionFactory(Uri brokerUri, string clientID)
-               {
-                       this.brokerUri = brokerUri;
-               }
-
-               /// <summary>
-               /// Creates a new connection to MSMQ.
-               /// </summary>
-               public IConnection CreateConnection()
-               {
-                       return CreateConnection(string.Empty, string.Empty, 
false);
-               }
-
-               /// <summary>
-               /// Creates a new connection to MSMQ.
-               /// </summary>
-               public IConnection CreateConnection(string userName, string 
password)
-               {
-                       return CreateConnection(userName, password, false);
-               }
-
-               /// <summary>
-               /// Creates a new connection to MSMQ.
-               /// </summary>
-               public IConnection CreateConnection(string userName, string 
password, bool useLogging)
-               {
-                       IConnection connection = new Connection();
-
-                       connection.RedeliveryPolicy = 
this.redeliveryPolicy.Clone() as IRedeliveryPolicy;
-                       return connection;
-               }
-
-               /// <summary>
-               /// Get/or set the broker Uri.
-               /// </summary>
-               public Uri BrokerUri
-               {
-                       get { return brokerUri; }
-                       set { brokerUri = value; }
-               }
-
-               /// <summary>
-               /// Get/or set the redelivery policy that new IConnection 
objects are
-               /// assigned upon creation.
-               /// </summary>
-               public IRedeliveryPolicy RedeliveryPolicy
-               {
-                       get { return this.redeliveryPolicy; }
-                       set
-                       {
-                               if(value != null)
-                               {
-                                       this.redeliveryPolicy = value;
-                               }
-                       }
-               }
-       }
+    /// <summary>
+    /// A Factory that can estbalish NMS connections to MSMQ
+    /// </summary>
+    public class ConnectionFactory : IConnectionFactory
+    {
+        public const string DEFAULT_BROKER_URL = "msmq://localhost";
+        public const string ENV_BROKER_URL = "MSMQ_BROKER_URL";
+        private Uri brokerUri;
+        private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+
+        public static string GetDefaultBrokerUrl()
+        {
+            string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL);
+            if(answer == null)
+            {
+                answer = DEFAULT_BROKER_URL;
+            }
+            return answer;
+        }
+
+        public ConnectionFactory()
+            : this(GetDefaultBrokerUrl())
+        {
+        }
+
+        public ConnectionFactory(string brokerUri)
+            : this(brokerUri, null)
+        {
+        }
+
+        public ConnectionFactory(string brokerUri, string clientID)
+            : this(new Uri(brokerUri), clientID)
+        {
+        }
+
+        public ConnectionFactory(Uri brokerUri)
+            : this(brokerUri, null)
+        {
+        }
+
+        public ConnectionFactory(Uri brokerUri, string clientID)
+        {
+            this.brokerUri = brokerUri;
+        }
+
+        /// <summary>
+        /// Creates a new connection to MSMQ.
+        /// </summary>
+        public IConnection CreateConnection()
+        {
+            return CreateConnection(string.Empty, string.Empty, false);
+        }
+
+        /// <summary>
+        /// Creates a new connection to MSMQ.
+        /// </summary>
+        public IConnection CreateConnection(string userName, string password)
+        {
+            return CreateConnection(userName, password, false);
+        }
+
+        /// <summary>
+        /// Creates a new connection to MSMQ.
+        /// </summary>
+        public IConnection CreateConnection(string userName, string password, 
bool useLogging)
+        {
+            IConnection connection = new Connection();
+
+            connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as 
IRedeliveryPolicy;
+            connection.ConsumerTransformer = this.consumerTransformer;
+            connection.ProducerTransformer = this.producerTransformer;
+
+            return connection;
+        }
+
+        /// <summary>
+        /// Get/or set the broker Uri.
+        /// </summary>
+        public Uri BrokerUri
+        {
+            get { return brokerUri; }
+            set { brokerUri = value; }
+        }
+
+        /// <summary>
+        /// Get/or set the redelivery policy that new IConnection objects are
+        /// assigned upon creation.
+        /// </summary>
+        public IRedeliveryPolicy RedeliveryPolicy
+        {
+            get { return this.redeliveryPolicy; }
+            set
+            {
+                if(value != null)
+                {
+                    this.redeliveryPolicy = value;
+                }
+            }
+        }
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/MessageConsumer.cs
----------------------------------------------------------------------
diff --git a/trunk/src/main/csharp/MessageConsumer.cs 
b/trunk/src/main/csharp/MessageConsumer.cs
index ee57b96..708a372 100644
--- a/trunk/src/main/csharp/MessageConsumer.cs
+++ b/trunk/src/main/csharp/MessageConsumer.cs
@@ -21,211 +21,230 @@ using Apache.NMS.Util;
 
 namespace Apache.NMS.MSMQ
 {
-       /// <summary>
-       /// An object capable of receiving messages from some destination
-       /// </summary>
-       public class MessageConsumer : IMessageConsumer
-       {
-               protected TimeSpan zeroTimeout = new TimeSpan(0);
-
-               private readonly Session session;
-               private readonly AcknowledgementMode acknowledgementMode;
-               private MessageQueue messageQueue;
-               private event MessageListener listener;
-               private int listenerCount = 0;
-               private Thread asyncDeliveryThread = null;
-               private AutoResetEvent pause = new AutoResetEvent(false);
-               private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
-
-               public MessageConsumer(Session session, AcknowledgementMode 
acknowledgementMode, MessageQueue messageQueue)
-               {
-                       this.session = session;
-                       this.acknowledgementMode = acknowledgementMode;
-                       this.messageQueue = messageQueue;
-                       if(null != this.messageQueue)
-                       {
-                               
this.messageQueue.MessageReadPropertyFilter.SetAll();
-                       }
-               }
-
-               public event MessageListener Listener
-               {
-                       add
-                       {
-                               listener += value;
-                               listenerCount++;
-                               StartAsyncDelivery();
-                       }
-
-                       remove
-                       {
-                               if(listenerCount > 0)
-                               {
-                                       listener -= value;
-                                       listenerCount--;
-                               }
-
-                               if(0 == listenerCount)
-                               {
-                                       StopAsyncDelivery();
-                               }
-                       }
-               }
-
-               public IMessage Receive()
-               {
-                       IMessage nmsMessage = null;
-
-                       if(messageQueue != null)
-                       {
-                               Message message;
-
-                               try
-                               {
-                                       message = 
messageQueue.Receive(zeroTimeout);
-                               }
-                               catch
-                               {
-                                       message = null;
-                               }
-
-                               if(null == message)
-                               {
-                                       ReceiveCompletedEventHandler receiveMsg 
=
-                                                       delegate(Object source, 
ReceiveCompletedEventArgs asyncResult) {
-                                                               message = 
messageQueue.EndReceive(asyncResult.AsyncResult);
-                                                               pause.Set();
-                                                       };
-
-                                       messageQueue.ReceiveCompleted += 
receiveMsg;
-                                       messageQueue.BeginReceive();
-                                       pause.WaitOne();
-                                       messageQueue.ReceiveCompleted -= 
receiveMsg;
-                               }
-
-                               nmsMessage = ToNmsMessage(message);
-                       }
-
-                       return nmsMessage;
-               }
-
-               public IMessage Receive(TimeSpan timeout)
-               {
-                       IMessage nmsMessage = null;
-
-                       if(messageQueue != null)
-                       {
-                               Message message = messageQueue.Receive(timeout);
-                               nmsMessage = ToNmsMessage(message);
-                       }
-
-                       return nmsMessage;
-               }
-
-               public IMessage ReceiveNoWait()
-               {
-                       IMessage nmsMessage = null;
-
-                       if(messageQueue != null)
-                       {
-                               Message message = 
messageQueue.Receive(zeroTimeout);
-                               nmsMessage = ToNmsMessage(message);
-                       }
-
-                       return nmsMessage;
-               }
-
-               public void Dispose()
-               {
-                       Close();
-               }
-
-               public void Close()
-               {
-                       StopAsyncDelivery();
-                       if(messageQueue != null)
-                       {
-                               messageQueue.Dispose();
-                               messageQueue = null;
-                       }
-               }
-
-               protected virtual void StopAsyncDelivery()
-               {
-                       if(asyncDelivery.CompareAndSet(true, false))
-                       {
-                               if(null != asyncDeliveryThread)
-                               {
-                                       Tracer.Info("Stopping async delivery 
thread.");
-                                       pause.Set();
-                                       if(!asyncDeliveryThread.Join(10000))
-                                       {
-                                               Tracer.Info("Aborting async 
delivery thread.");
-                                               asyncDeliveryThread.Abort();
-                                       }
-
-                                       asyncDeliveryThread = null;
-                                       Tracer.Info("Async delivery thread 
stopped.");
-                               }
-                       }
-               }
-
-               protected virtual void StartAsyncDelivery()
-               {
-                       if(asyncDelivery.CompareAndSet(false, true))
-                       {
-                               asyncDeliveryThread = new Thread(new 
ThreadStart(DispatchLoop));
-                               asyncDeliveryThread.Name = "Message Consumer 
Dispatch: " + messageQueue.QueueName;
-                               asyncDeliveryThread.IsBackground = true;
-                               asyncDeliveryThread.Start();
-                       }
-               }
-
-               protected virtual void DispatchLoop()
-               {
-                       Tracer.Info("Starting dispatcher thread consumer: " + 
this);
-                       while(asyncDelivery.Value)
-                       {
-                               try
-                               {
-                                       IMessage message = Receive();
-                                       if(asyncDelivery.Value && message != 
null)
-                                       {
-                                               try
-                                               {
-                                                       listener(message);
-                                               }
-                                               catch(Exception e)
-                                               {
-                                                       HandleAsyncException(e);
-                                               }
-                                       }
-                               }
-                               catch(ThreadAbortException ex)
-                               {
-                                       Tracer.InfoFormat("Thread abort 
received in thread: {0} : {1}", this, ex.Message);
-                                       break;
-                               }
-                               catch(Exception ex)
-                               {
-                                       Tracer.ErrorFormat("Exception while 
receiving message in thread: {0} : {1}", this, ex.Message);
-                               }
-                       }
-                       Tracer.Info("Stopping dispatcher thread consumer: " + 
this);
-               }
-
-               protected virtual void HandleAsyncException(Exception e)
-               {
-                       session.Connection.HandleException(e);
-               }
-
-               protected virtual IMessage ToNmsMessage(Message message)
-               {
-                       if(message == null)
-                       {
-                               return null;
-                       }
-                       return session.MessageConverter.ToNmsMessage(message);
-               }
-       }
+    /// <summary>
+    /// An object capable of receiving messages from some destination
+    /// </summary>
+    public class MessageConsumer : IMessageConsumer
+    {
+        protected TimeSpan zeroTimeout = new TimeSpan(0);
+
+        private readonly Session session;
+        private readonly AcknowledgementMode acknowledgementMode;
+        private MessageQueue messageQueue;
+        private event MessageListener listener;
+        private int listenerCount = 0;
+        private Thread asyncDeliveryThread = null;
+        private AutoResetEvent pause = new AutoResetEvent(false);
+        private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        public MessageConsumer(Session session, AcknowledgementMode 
acknowledgementMode, MessageQueue messageQueue)
+        {
+            this.session = session;
+            this.acknowledgementMode = acknowledgementMode;
+            this.messageQueue = messageQueue;
+            if(null != this.messageQueue)
+            {
+                this.messageQueue.MessageReadPropertyFilter.SetAll();
+            }
+        }
+
+        public event MessageListener Listener
+        {
+            add
+            {
+                listener += value;
+                listenerCount++;
+                StartAsyncDelivery();
+            }
+
+            remove
+            {
+                if(listenerCount > 0)
+                {
+                    listener -= value;
+                    listenerCount--;
+                }
+
+                if(0 == listenerCount)
+                {
+                    StopAsyncDelivery();
+                }
+            }
+        }
+
+        public IMessage Receive()
+        {
+            IMessage nmsMessage = null;
+
+            if(messageQueue != null)
+            {
+                Message message;
+
+                try
+                {
+                    message = messageQueue.Receive(zeroTimeout);
+                }
+                catch
+                {
+                    message = null;
+                }
+
+                if(null == message)
+                {
+                    ReceiveCompletedEventHandler receiveMsg =
+                            delegate(Object source, ReceiveCompletedEventArgs 
asyncResult) {
+                                message = 
messageQueue.EndReceive(asyncResult.AsyncResult);
+                                pause.Set();
+                            };
+
+                    messageQueue.ReceiveCompleted += receiveMsg;
+                    messageQueue.BeginReceive();
+                    pause.WaitOne();
+                    messageQueue.ReceiveCompleted -= receiveMsg;
+                }
+
+                nmsMessage = ToNmsMessage(message);
+            }
+
+            return nmsMessage;
+        }
+
+        public IMessage Receive(TimeSpan timeout)
+        {
+            IMessage nmsMessage = null;
+
+            if(messageQueue != null)
+            {
+                Message message = messageQueue.Receive(timeout);
+                nmsMessage = ToNmsMessage(message);
+            }
+
+            return nmsMessage;
+        }
+
+        public IMessage ReceiveNoWait()
+        {
+            IMessage nmsMessage = null;
+
+            if(messageQueue != null)
+            {
+                Message message = messageQueue.Receive(zeroTimeout);
+                nmsMessage = ToNmsMessage(message);
+            }
+
+            return nmsMessage;
+        }
+
+        public void Dispose()
+        {
+            Close();
+        }
+
+        public void Close()
+        {
+            StopAsyncDelivery();
+            if(messageQueue != null)
+            {
+                messageQueue.Dispose();
+                messageQueue = null;
+            }
+        }
+
+        protected virtual void StopAsyncDelivery()
+        {
+            if(asyncDelivery.CompareAndSet(true, false))
+            {
+                if(null != asyncDeliveryThread)
+                {
+                    Tracer.Info("Stopping async delivery thread.");
+                    pause.Set();
+                    if(!asyncDeliveryThread.Join(10000))
+                    {
+                        Tracer.Info("Aborting async delivery thread.");
+                        asyncDeliveryThread.Abort();
+                    }
+
+                    asyncDeliveryThread = null;
+                    Tracer.Info("Async delivery thread stopped.");
+                }
+            }
+        }
+
+        protected virtual void StartAsyncDelivery()
+        {
+            if(asyncDelivery.CompareAndSet(false, true))
+            {
+                asyncDeliveryThread = new Thread(new 
ThreadStart(DispatchLoop));
+                asyncDeliveryThread.Name = "Message Consumer Dispatch: " + 
messageQueue.QueueName;
+                asyncDeliveryThread.IsBackground = true;
+                asyncDeliveryThread.Start();
+            }
+        }
+
+        protected virtual void DispatchLoop()
+        {
+            Tracer.Info("Starting dispatcher thread consumer: " + this);
+            while(asyncDelivery.Value)
+            {
+                try
+                {
+                    IMessage message = Receive();
+                    if(asyncDelivery.Value && message != null)
+                    {
+                        try
+                        {
+                            listener(message);
+                        }
+                        catch(Exception e)
+                        {
+                            HandleAsyncException(e);
+                        }
+                    }
+                }
+                catch(ThreadAbortException ex)
+                {
+                    Tracer.InfoFormat("Thread abort received in thread: {0} : 
{1}", this, ex.Message);
+                    break;
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Exception while receiving message in 
thread: {0} : {1}", this, ex.Message);
+                }
+            }
+            Tracer.Info("Stopping dispatcher thread consumer: " + this);
+        }
+
+        protected virtual void HandleAsyncException(Exception e)
+        {
+            session.Connection.HandleException(e);
+        }
+
+        protected virtual IMessage ToNmsMessage(Message message)
+        {
+            if(message == null)
+            {
+                return null;
+            }
+
+            IMessage converted = 
session.MessageConverter.ToNmsMessage(message);
+
+            if(this.ConsumerTransformer != null)
+            {
+                IMessage newMessage = ConsumerTransformer(this.session, this, 
message);
+                if(newMessage != null)
+                {
+                    converted = newMessage;
+                }
+            }
+
+            return converted;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/MessageProducer.cs
----------------------------------------------------------------------
diff --git a/trunk/src/main/csharp/MessageProducer.cs 
b/trunk/src/main/csharp/MessageProducer.cs
index 26598d3..5912c07 100644
--- a/trunk/src/main/csharp/MessageProducer.cs
+++ b/trunk/src/main/csharp/MessageProducer.cs
@@ -19,258 +19,274 @@ using System.Messaging;
 
 namespace Apache.NMS.MSMQ
 {
-       /// <summary>
-       /// An object capable of sending messages to some destination
-       /// </summary>
-       public class MessageProducer : IMessageProducer
-       {
-
-               private readonly Session session;
-               private Destination destination;
-
-               //private long messageCounter;
-               private MsgDeliveryMode deliveryMode;
-               private TimeSpan timeToLive;
-               private MsgPriority priority;
-               private bool disableMessageID;
-               private bool disableMessageTimestamp;
-
-               private MessageQueue messageQueue;
-
-               public MessageProducer(Session session, Destination destination)
-               {
-                       this.session = session;
-                       this.destination = destination;
-                       if(destination != null)
-                       {
-                               messageQueue = openMessageQueue(destination);
-                       }
-               }
-
-               private MessageQueue openMessageQueue(Destination dest)
-               {
-                       MessageQueue rc = null;
-                       try
-                       {
-                               if(!MessageQueue.Exists(dest.Path))
-                               {
-                                       // create the new message queue and 
make it transactional
-                                       rc = MessageQueue.Create(dest.Path, 
session.Transacted);
-                                       this.destination.Path = rc.Path;
-                               }
-                               else
-                               {
-                                       rc = new MessageQueue(dest.Path);
-                                       this.destination.Path = rc.Path;
-                                       if(!rc.CanWrite)
-                                       {
-                                               throw new 
NMSSecurityException("Do not have write access to: " + dest);
-                                       }
-                               }
-                       }
-                       catch(Exception e)
-                       {
-                               if(rc != null)
-                               {
-                                       rc.Dispose();
-                               }
-
-                               throw new NMSException(e.Message + ": " + dest, 
e);
-                       }
-                       return rc;
-               }
-
-               public void Send(IMessage message)
-               {
-                       Send(Destination, message);
-               }
-
-               public void Send(IMessage message, MsgDeliveryMode 
deliveryMode, MsgPriority priority, TimeSpan timeToLive)
-               {
-                       Send(Destination, message, deliveryMode, priority, 
timeToLive);
-               }
-
-               public void Send(IDestination destination, IMessage message)
-               {
-                       Send(destination, message, DeliveryMode, Priority, 
TimeToLive);
-               }
-
-               public void Send(IDestination destination, IMessage imessage, 
MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
-               {
-                       BaseMessage message = (BaseMessage) imessage;
-                       MessageQueue mq = null;
-
-                       try
-                       {
-                               // Locate the MSMQ Queue we will be sending to
-                               if(messageQueue != null)
-                               {
-                                       if(destination.Equals(this.destination))
-                                       {
-                                               mq = messageQueue;
-                                       }
-                                       else
-                                       {
-                                               throw new NMSException("This 
producer can only be used to send to: " + destination);
-                                       }
-                               }
-                               else
-                               {
-                                       mq = openMessageQueue((Destination) 
destination);
-                               }
-
-                               message.NMSDeliveryMode = deliveryMode;
-                               message.NMSTimeToLive = timeToLive;
-                               message.NMSPriority = priority;
-                               if(!DisableMessageTimestamp)
-                               {
-                                       message.NMSTimestamp = DateTime.UtcNow;
-                               }
-
-                               if(!DisableMessageID)
-                               {
-                                       // TODO: message.NMSMessageId =
-                               }
-
-                               // Convert the Mesasge into a MSMQ message
-                               Message msg = 
session.MessageConverter.ToMsmqMessage(message);
-
-                               if(mq.Transactional)
-                               {
-                                       if(session.Transacted)
-                                       {
-                                               mq.Send(msg, 
session.MessageQueueTransaction);
-
-                                       }
-                                       else
-                                       {
-                                               // Start our own mini 
transaction here to send the message.
-                                               using(MessageQueueTransaction 
transaction = new MessageQueueTransaction())
-                                               {
-                                                       transaction.Begin();
-                                                       mq.Send(msg, 
transaction);
-                                                       transaction.Commit();
-                                               }
-                                       }
-                               }
-                               else
-                               {
-                                       if(session.Transacted)
-                                       {
-                                               // We may want to raise an 
exception here since app requested
-                                               // a transeced NMS session, but 
is using a non transacted message queue
-                                               // For now silently ignore it.
-                                       }
-                                       mq.Send(msg);
-                               }
-
-                       }
-                       finally
-                       {
-                               if(mq != null && mq != messageQueue)
-                               {
-                                       mq.Dispose();
-                               }
-                       }
-               }
-
-               public void Close()
-               {
-                       if(messageQueue != null)
-                       {
-                               messageQueue.Dispose();
-                               messageQueue = null;
-                       }
-               }
-
-               public void Dispose()
-               {
-                       Close();
-               }
-
-               public IMessage CreateMessage()
-               {
-                       return session.CreateMessage();
-               }
-
-               public ITextMessage CreateTextMessage()
-               {
-                       return session.CreateTextMessage();
-               }
-
-               public ITextMessage CreateTextMessage(String text)
-               {
-                       return session.CreateTextMessage(text);
-               }
-
-               public IMapMessage CreateMapMessage()
-               {
-                       return session.CreateMapMessage();
-               }
-
-               public IObjectMessage CreateObjectMessage(Object body)
-               {
-                       return session.CreateObjectMessage(body);
-               }
-
-               public IBytesMessage CreateBytesMessage()
-               {
-                       return session.CreateBytesMessage();
-               }
-
-               public IBytesMessage CreateBytesMessage(byte[] body)
-               {
-                       return session.CreateBytesMessage(body);
-               }
-
-               public IStreamMessage CreateStreamMessage()
-               {
-                       return session.CreateStreamMessage();
-               }
-
-               public MsgDeliveryMode DeliveryMode
-               {
-                       get { return deliveryMode; }
-                       set { deliveryMode = value; }
-               }
-
-               public TimeSpan TimeToLive
-               {
-                       get { return timeToLive; }
-                       set { timeToLive = value; }
-               }
-
-               /// <summary>
-               /// The default timeout for network requests.
-               /// </summary>
-               public TimeSpan RequestTimeout
-               {
-                       get { return NMSConstants.defaultRequestTimeout; }
-                       set { }
-               }
-
-               public IDestination Destination
-               {
-                       get { return destination; }
-                       set { destination = (Destination) value; }
-               }
-
-               public MsgPriority Priority
-               {
-                       get { return priority; }
-                       set { priority = value; }
-               }
-
-               public bool DisableMessageID
-               {
-                       get { return disableMessageID; }
-                       set { disableMessageID = value; }
-               }
-
-               public bool DisableMessageTimestamp
-               {
-                       get { return disableMessageTimestamp; }
-                       set { disableMessageTimestamp = value; }
-               }
-       }
+    /// <summary>
+    /// An object capable of sending messages to some destination
+    /// </summary>
+    public class MessageProducer : IMessageProducer
+    {
+
+        private readonly Session session;
+        private Destination destination;
+
+        //private long messageCounter;
+        private MsgDeliveryMode deliveryMode;
+        private TimeSpan timeToLive;
+        private MsgPriority priority;
+        private bool disableMessageID;
+        private bool disableMessageTimestamp;
+
+        private MessageQueue messageQueue;
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+        public MessageProducer(Session session, Destination destination)
+        {
+            this.session = session;
+            this.destination = destination;
+            if(destination != null)
+            {
+                messageQueue = openMessageQueue(destination);
+            }
+        }
+
+        private MessageQueue openMessageQueue(Destination dest)
+        {
+            MessageQueue rc = null;
+            try
+            {
+                if(!MessageQueue.Exists(dest.Path))
+                {
+                    // create the new message queue and make it transactional
+                    rc = MessageQueue.Create(dest.Path, session.Transacted);
+                    this.destination.Path = rc.Path;
+                }
+                else
+                {
+                    rc = new MessageQueue(dest.Path);
+                    this.destination.Path = rc.Path;
+                    if(!rc.CanWrite)
+                    {
+                        throw new NMSSecurityException("Do not have write 
access to: " + dest);
+                    }
+                }
+            }
+            catch(Exception e)
+            {
+                if(rc != null)
+                {
+                    rc.Dispose();
+                }
+
+                throw new NMSException(e.Message + ": " + dest, e);
+            }
+            return rc;
+        }
+
+        public void Send(IMessage message)
+        {
+            Send(Destination, message);
+        }
+
+        public void Send(IMessage message, MsgDeliveryMode deliveryMode, 
MsgPriority priority, TimeSpan timeToLive)
+        {
+            Send(Destination, message, deliveryMode, priority, timeToLive);
+        }
+
+        public void Send(IDestination destination, IMessage message)
+        {
+            Send(destination, message, DeliveryMode, Priority, TimeToLive);
+        }
+
+        public void Send(IDestination destination, IMessage imessage, 
MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+        {
+            BaseMessage message = (BaseMessage) imessage;
+            MessageQueue mq = null;
+
+            try
+            {
+                // Locate the MSMQ Queue we will be sending to
+                if(messageQueue != null)
+                {
+                    if(destination.Equals(this.destination))
+                    {
+                        mq = messageQueue;
+                    }
+                    else
+                    {
+                        throw new NMSException("This producer can only be used 
to send to: " + destination);
+                    }
+                }
+                else
+                {
+                    mq = openMessageQueue((Destination) destination);
+                }
+
+                if(this.ProducerTransformer != null)
+                {
+                    IMessage transformed = 
this.ProducerTransformer(this.session, this, message);
+                    if(transformed != null)
+                    {
+                        message = transformed;
+                    }
+                }
+
+                message.NMSDeliveryMode = deliveryMode;
+                message.NMSTimeToLive = timeToLive;
+                message.NMSPriority = priority;
+                if(!DisableMessageTimestamp)
+                {
+                    message.NMSTimestamp = DateTime.UtcNow;
+                }
+
+                if(!DisableMessageID)
+                {
+                    // TODO: message.NMSMessageId =
+                }
+
+                // Convert the Mesasge into a MSMQ message
+                Message msg = session.MessageConverter.ToMsmqMessage(message);
+
+                if(mq.Transactional)
+                {
+                    if(session.Transacted)
+                    {
+                        mq.Send(msg, session.MessageQueueTransaction);
+
+                    }
+                    else
+                    {
+                        // Start our own mini transaction here to send the 
message.
+                        using(MessageQueueTransaction transaction = new 
MessageQueueTransaction())
+                        {
+                            transaction.Begin();
+                            mq.Send(msg, transaction);
+                            transaction.Commit();
+                        }
+                    }
+                }
+                else
+                {
+                    if(session.Transacted)
+                    {
+                        // We may want to raise an exception here since app 
requested
+                        // a transeced NMS session, but is using a non 
transacted message queue
+                        // For now silently ignore it.
+                    }
+                    mq.Send(msg);
+                }
+
+            }
+            finally
+            {
+                if(mq != null && mq != messageQueue)
+                {
+                    mq.Dispose();
+                }
+            }
+        }
+
+        public void Close()
+        {
+            if(messageQueue != null)
+            {
+                messageQueue.Dispose();
+                messageQueue = null;
+            }
+        }
+
+        public void Dispose()
+        {
+            Close();
+        }
+
+        public IMessage CreateMessage()
+        {
+            return session.CreateMessage();
+        }
+
+        public ITextMessage CreateTextMessage()
+        {
+            return session.CreateTextMessage();
+        }
+
+        public ITextMessage CreateTextMessage(String text)
+        {
+            return session.CreateTextMessage(text);
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return session.CreateMapMessage();
+        }
+
+        public IObjectMessage CreateObjectMessage(Object body)
+        {
+            return session.CreateObjectMessage(body);
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return session.CreateBytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            return session.CreateBytesMessage(body);
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            return session.CreateStreamMessage();
+        }
+
+        public MsgDeliveryMode DeliveryMode
+        {
+            get { return deliveryMode; }
+            set { deliveryMode = value; }
+        }
+
+        public TimeSpan TimeToLive
+        {
+            get { return timeToLive; }
+            set { timeToLive = value; }
+        }
+
+        /// <summary>
+        /// The default timeout for network requests.
+        /// </summary>
+        public TimeSpan RequestTimeout
+        {
+            get { return NMSConstants.defaultRequestTimeout; }
+            set { }
+        }
+
+        public IDestination Destination
+        {
+            get { return destination; }
+            set { destination = (Destination) value; }
+        }
+
+        public MsgPriority Priority
+        {
+            get { return priority; }
+            set { priority = value; }
+        }
+
+        public bool DisableMessageID
+        {
+            get { return disableMessageID; }
+            set { disableMessageID = value; }
+        }
+
+        public bool DisableMessageTimestamp
+        {
+            get { return disableMessageTimestamp; }
+            set { disableMessageTimestamp = value; }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/Session.cs
----------------------------------------------------------------------
diff --git a/trunk/src/main/csharp/Session.cs b/trunk/src/main/csharp/Session.cs
index 6ee3d9e..71ee69c 100644
--- a/trunk/src/main/csharp/Session.cs
+++ b/trunk/src/main/csharp/Session.cs
@@ -19,229 +19,243 @@ using System.Messaging;
 
 namespace Apache.NMS.MSMQ
 {
-       /// <summary>
-       /// MSQM provider of ISession
-       /// </summary>
-       public class Session : ISession
-       {
-               private Connection connection;
-               private AcknowledgementMode acknowledgementMode;
-               private MessageQueueTransaction messageQueueTransaction;
-               private IMessageConverter messageConverter;
-
-               public Session(Connection connection, AcknowledgementMode 
acknowledgementMode)
-               {
-                       this.connection = connection;
-                       this.acknowledgementMode = acknowledgementMode;
-                       MessageConverter = connection.MessageConverter;
-                       if(this.acknowledgementMode == 
AcknowledgementMode.Transactional)
-                       {
-                               MessageQueueTransaction = new 
MessageQueueTransaction();
-                       }
-               }
-
-               public void Dispose()
-               {
-                       if(MessageQueueTransaction != null)
-                       {
-                               MessageQueueTransaction.Dispose();
-                       }
-               }
-
-               public IMessageProducer CreateProducer()
-               {
-                       return CreateProducer(null);
-               }
-
-               public IMessageProducer CreateProducer(IDestination destination)
-               {
-                       return new MessageProducer(this, (Destination) 
destination);
-               }
-
-               public IMessageConsumer CreateConsumer(IDestination destination)
-               {
-                       return CreateConsumer(destination, null);
-               }
-
-               public IMessageConsumer CreateConsumer(IDestination 
destination, string selector)
-               {
-                       return CreateConsumer(destination, selector, false);
-               }
-
-               public IMessageConsumer CreateConsumer(IDestination 
destination, string selector, bool noLocal)
-               {
-                       if(selector != null)
-                       {
-                               throw new NotSupportedException("Selectors are 
not supported by MSMQ");
-                       }
-                       MessageQueue queue = 
MessageConverter.ToMsmqDestination(destination);
-                       return new MessageConsumer(this, acknowledgementMode, 
queue);
-               }
-
-               public IMessageConsumer CreateDurableConsumer(ITopic 
destination, string name, string selector, bool noLocal)
-               {
-                       throw new NotSupportedException("Durable Topic 
subscribers are not supported by MSMQ");
-               }
-
-               public void DeleteDurableConsumer(string name)
-               {
-                       throw new NotSupportedException("Durable Topic 
subscribers are not supported by MSMQ");
-               }
-
-               public IQueueBrowser CreateBrowser(IQueue queue)
-               {
-                       throw new NotImplementedException();
-               }
-
-               public IQueueBrowser CreateBrowser(IQueue queue, string 
selector)
-               {
-                       throw new NotImplementedException();
-               }
-
-               public IQueue GetQueue(string name)
-               {
-                       return new Queue(name);
-               }
-
-               public ITopic GetTopic(string name)
-               {
-                       throw new NotSupportedException("Topics are not 
supported by MSMQ");
-               }
-
-               public ITemporaryQueue CreateTemporaryQueue()
-               {
-                       throw new NotSupportedException("Tempoary Queues are 
not supported by MSMQ");
-               }
-
-               public ITemporaryTopic CreateTemporaryTopic()
-               {
-                       throw new NotSupportedException("Tempoary Topics are 
not supported by MSMQ");
-               }
-
-               /// <summary>
-               /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
-               /// </summary>
-               public void DeleteDestination(IDestination destination)
-               {
-                       // TODO: Implement if possible.  If not possible, then 
change exception to NotSupportedException().
-                       throw new NotImplementedException();
-               }
-
-               public IMessage CreateMessage()
-               {
-                       BaseMessage answer = new BaseMessage();
-                       return answer;
-               }
-
-
-               public ITextMessage CreateTextMessage()
-               {
-                       TextMessage answer = new TextMessage();
-                       return answer;
-               }
-
-               public ITextMessage CreateTextMessage(string text)
-               {
-                       TextMessage answer = new TextMessage(text);
-                       return answer;
-               }
-
-               public IMapMessage CreateMapMessage()
-               {
-                       return new MapMessage();
-               }
-
-               public IBytesMessage CreateBytesMessage()
-               {
-                       return new BytesMessage();
-               }
-
-               public IBytesMessage CreateBytesMessage(byte[] body)
-               {
-                       BytesMessage answer = new BytesMessage();
-                       answer.Content = body;
-                       return answer;
-               }
-
-               public IStreamMessage CreateStreamMessage()
-               {
-                       return new StreamMessage();
-               }
-
-               public IObjectMessage CreateObjectMessage(Object body)
-               {
-                       ObjectMessage answer = new ObjectMessage();
-                       answer.Body = body;
-                       return answer;
-               }
-
-               public void Commit()
-               {
-                       if(!Transacted)
-                       {
-                               throw new InvalidOperationException("You cannot 
perform a Commit() on a non-transacted session. Acknowlegement mode is: " + 
acknowledgementMode);
-                       }
-                       messageQueueTransaction.Commit();
-               }
-
-               public void Rollback()
-               {
-                       if(!Transacted)
-                       {
-                               throw new InvalidOperationException("You cannot 
perform a Commit() on a non-transacted session. Acknowlegement mode is: " + 
acknowledgementMode);
-                       }
-                       messageQueueTransaction.Abort();
-               }
-
-               // Properties
-               public Connection Connection
-               {
-                       get { return connection; }
-               }
-
-               /// <summary>
-               /// The default timeout for network requests.
-               /// </summary>
-               public TimeSpan RequestTimeout
-               {
-                       get { return NMSConstants.defaultRequestTimeout; }
-                       set { }
-               }
-
-               public bool Transacted
-               {
-                       get { return acknowledgementMode == 
AcknowledgementMode.Transactional; }
-               }
-
-               public AcknowledgementMode AcknowledgementMode
-               {
-                       get { throw new NotImplementedException(); }
-               }
-
-               public MessageQueueTransaction MessageQueueTransaction
-               {
-                       get
-                       {
-                               if(null != messageQueueTransaction
-                                       && messageQueueTransaction.Status != 
MessageQueueTransactionStatus.Pending)
-                               {
-                                       messageQueueTransaction.Begin();
-                               }
-
-                               return messageQueueTransaction;
-                       }
-                       set { messageQueueTransaction = value; }
-               }
-
-               public IMessageConverter MessageConverter
-               {
-                       get { return messageConverter; }
-                       set { messageConverter = value; }
-               }
-
-               public void Close()
-               {
-                       Dispose();
-               }
-       }
+    /// <summary>
+    /// MSQM provider of ISession
+    /// </summary>
+    public class Session : ISession
+    {
+        private Connection connection;
+        private AcknowledgementMode acknowledgementMode;
+        private MessageQueueTransaction messageQueueTransaction;
+        private IMessageConverter messageConverter;
+
+        public Session(Connection connection, AcknowledgementMode 
acknowledgementMode)
+        {
+            this.connection = connection;
+            this.acknowledgementMode = acknowledgementMode;
+            MessageConverter = connection.MessageConverter;
+            if(this.acknowledgementMode == AcknowledgementMode.Transactional)
+            {
+                MessageQueueTransaction = new MessageQueueTransaction();
+            }
+        }
+
+        public void Dispose()
+        {
+            if(MessageQueueTransaction != null)
+            {
+                MessageQueueTransaction.Dispose();
+            }
+        }
+
+        public IMessageProducer CreateProducer()
+        {
+            return CreateProducer(null);
+        }
+
+        public IMessageProducer CreateProducer(IDestination destination)
+        {
+            return new MessageProducer(this, (Destination) destination);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination)
+        {
+            return CreateConsumer(destination, null);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination, 
string selector)
+        {
+            return CreateConsumer(destination, selector, false);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination, 
string selector, bool noLocal)
+        {
+            if(selector != null)
+            {
+                throw new NotSupportedException("Selectors are not supported 
by MSMQ");
+            }
+            MessageQueue queue = 
MessageConverter.ToMsmqDestination(destination);
+            return new MessageConsumer(this, acknowledgementMode, queue);
+        }
+
+        public IMessageConsumer CreateDurableConsumer(ITopic destination, 
string name, string selector, bool noLocal)
+        {
+            throw new NotSupportedException("Durable Topic subscribers are not 
supported by MSMQ");
+        }
+
+        public void DeleteDurableConsumer(string name)
+        {
+            throw new NotSupportedException("Durable Topic subscribers are not 
supported by MSMQ");
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IQueueBrowser CreateBrowser(IQueue queue, string selector)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IQueue GetQueue(string name)
+        {
+            return new Queue(name);
+        }
+
+        public ITopic GetTopic(string name)
+        {
+            throw new NotSupportedException("Topics are not supported by 
MSMQ");
+        }
+
+        public ITemporaryQueue CreateTemporaryQueue()
+        {
+            throw new NotSupportedException("Tempoary Queues are not supported 
by MSMQ");
+        }
+
+        public ITemporaryTopic CreateTemporaryTopic()
+        {
+            throw new NotSupportedException("Tempoary Topics are not supported 
by MSMQ");
+        }
+
+        /// <summary>
+        /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
+        /// </summary>
+        public void DeleteDestination(IDestination destination)
+        {
+            // TODO: Implement if possible.  If not possible, then change 
exception to NotSupportedException().
+            throw new NotImplementedException();
+        }
+
+        public IMessage CreateMessage()
+        {
+            BaseMessage answer = new BaseMessage();
+            return answer;
+        }
+
+
+        public ITextMessage CreateTextMessage()
+        {
+            TextMessage answer = new TextMessage();
+            return answer;
+        }
+
+        public ITextMessage CreateTextMessage(string text)
+        {
+            TextMessage answer = new TextMessage(text);
+            return answer;
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return new MapMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return new BytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            BytesMessage answer = new BytesMessage();
+            answer.Content = body;
+            return answer;
+        }
+
+        public IStreamMessage CreateStreamMessage()
+        {
+            return new StreamMessage();
+        }
+
+        public IObjectMessage CreateObjectMessage(Object body)
+        {
+            ObjectMessage answer = new ObjectMessage();
+            answer.Body = body;
+            return answer;
+        }
+
+        public void Commit()
+        {
+            if(!Transacted)
+            {
+                throw new InvalidOperationException("You cannot perform a 
Commit() on a non-transacted session. Acknowlegement mode is: " + 
acknowledgementMode);
+            }
+            messageQueueTransaction.Commit();
+        }
+
+        public void Rollback()
+        {
+            if(!Transacted)
+            {
+                throw new InvalidOperationException("You cannot perform a 
Commit() on a non-transacted session. Acknowlegement mode is: " + 
acknowledgementMode);
+            }
+            messageQueueTransaction.Abort();
+        }
+
+        // Properties
+        public Connection Connection
+        {
+            get { return connection; }
+        }
+
+        /// <summary>
+        /// The default timeout for network requests.
+        /// </summary>
+        public TimeSpan RequestTimeout
+        {
+            get { return NMSConstants.defaultRequestTimeout; }
+            set { }
+        }
+
+        public bool Transacted
+        {
+            get { return acknowledgementMode == 
AcknowledgementMode.Transactional; }
+        }
+
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { throw new NotImplementedException(); }
+        }
+
+        public MessageQueueTransaction MessageQueueTransaction
+        {
+            get
+            {
+                if(null != messageQueueTransaction
+                    && messageQueueTransaction.Status != 
MessageQueueTransactionStatus.Pending)
+                {
+                    messageQueueTransaction.Begin();
+                }
+
+                return messageQueueTransaction;
+            }
+            set { messageQueueTransaction = value; }
+        }
+
+        public IMessageConverter MessageConverter
+        {
+            get { return messageConverter; }
+            set { messageConverter = value; }
+        }
+
+        private ConsumerTransformerDelegate consumerTransformer;
+        public ConsumerTransformerDelegate ConsumerTransformer
+        {
+            get { return this.consumerTransformer; }
+            set { this.consumerTransformer = value; }
+        }
+
+        private ProducerTransformerDelegate producerTransformer;
+        public ProducerTransformerDelegate ProducerTransformer
+        {
+            get { return this.producerTransformer; }
+            set { this.producerTransformer = value; }
+        }
+
+        public void Close()
+        {
+            Dispose();
+        }
+    }
 }

Reply via email to