Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=799407&r1=799406&r2=799407&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs 
Thu Jul 30 19:06:34 2009
@@ -23,610 +23,608 @@
 
 namespace Apache.NMS.ActiveMQ
 {
-       /// <summary>
-       /// Default provider of ISession
-       /// </summary>
-       public class Session : ISession
-       {
-               /// <summary>
-               /// Private object used for synchronization, instead of public 
"this"
-               /// </summary>
-               private readonly object myLock = new object();
-               private int consumerCounter;
-               private readonly IDictionary consumers = 
Hashtable.Synchronized(new Hashtable());
-               private readonly IDictionary producers = 
Hashtable.Synchronized(new Hashtable());
-               private readonly DispatchingThread dispatchingThread;
-               private DispatchingThread.ExceptionHandler 
dispatchingThread_ExceptionHandler;
-               private readonly SessionInfo info;
-               private int producerCounter;
-               internal bool startedAsyncDelivery = false;
-               private bool disposed = false;
-               private bool closed = false;
-               private bool closing = false;
-               private TimeSpan MAX_THREAD_WAIT = 
TimeSpan.FromMilliseconds(30000);
-
-               public Session(Connection connection, SessionInfo info, 
AcknowledgementMode acknowledgementMode)
-               {
-                       this.connection = connection;
-                       this.info = info;
-                       this.acknowledgementMode = acknowledgementMode;
-                       this.AsyncSend = connection.AsyncSend;
-                       this.requestTimeout = connection.RequestTimeout;
-                       this.PrefetchSize = 1000;
-                       this.transactionContext = new TransactionContext(this);
-                       this.dispatchingThread = new DispatchingThread(new 
DispatchingThread.DispatchFunction(DispatchAsyncMessages));
-                       this.dispatchingThread_ExceptionHandler = new 
DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
-               }
-
-               ~Session()
-               {
-                       Dispose(false);
-               }
-
-               /// <summary>
-               /// Sets the prefetch size, the maximum number of messages a 
broker will dispatch to consumers
-               /// until acknowledgements are received.
-               /// </summary>
-               public int PrefetchSize;
-
-               /// <summary>
-               /// Sets the maximum number of messages to keep around per 
consumer
-               /// in addition to the prefetch window for non-durable topics 
until messages
-               /// will start to be evicted for slow consumers.
-               /// Must be > 0 to enable this feature
-               /// </summary>
-               public int MaximumPendingMessageLimit;
-
-               /// <summary>
-               /// Enables or disables whether asynchronous dispatch should be 
used by the broker
-               /// </summary>
-               public bool DispatchAsync;
-
-               /// <summary>
-               /// Enables or disables exclusive consumers when using queues. 
An exclusive consumer means
-               /// only one instance of a consumer is allowed to process 
messages on a queue to preserve order
-               /// </summary>
-               public bool Exclusive;
-
-               /// <summary>
-               /// Enables or disables retroactive mode for consumers; i.e. do 
they go back in time or not?
-               /// </summary>
-               public bool Retroactive;
-
-               /// <summary>
-               /// Sets the default consumer priority for consumers
-               /// </summary>
-               public byte Priority;
-
-               /// <summary>
-               /// This property indicates whether or not async send is 
enabled.
-               /// </summary>
-               public bool AsyncSend;
-
-               private Connection connection;
-               public Connection Connection
-               {
-                       get { return this.connection; }
-               }
-
-               public SessionId SessionId
-               {
-                       get { return info.SessionId; }
-               }
-
-               private TransactionContext transactionContext;
-               public TransactionContext TransactionContext
-               {
-                       get { return this.transactionContext; }
-               }
-
-               #region ISession Members
-
-               public void Dispose()
-               {
-                       Dispose(true);
-                       GC.SuppressFinalize(this);
-               }
-
-               protected void Dispose(bool disposing)
-               {
-                       if(this.disposed)
-                       {
-                               return;
-                       }
-
-                       if(disposing)
-                       {
-                               // Dispose managed code here.
-                       }
-
-                       try
-                       {
-                               Close();
-                       }
-                       catch
-                       {
-                               // Ignore network errors.
-                       }
-
-                       this.disposed = true;
-               }
-
-               public void Close()
-               {
-                       lock(myLock)
-                       {
-                               if(this.closed)
-                               {
-                                       return;
-                               }
-
-                               try
-                               {
-                                       this.closing = true;
-                                       StopAsyncDelivery();
-                                       lock(consumers.SyncRoot)
-                                       {
-                                               foreach(MessageConsumer 
consumer in consumers.Values)
-                                               {
-                                                       consumer.Close();
-                                               }
-                                       }
-                                       consumers.Clear();
-
-                                       lock(producers.SyncRoot)
-                                       {
-                                               foreach(MessageProducer 
producer in producers.Values)
-                                               {
-                                                       producer.Close();
-                                               }
-                                       }
-                                       producers.Clear();
+    /// <summary>
+    /// Default provider of ISession
+    /// </summary>
+    public class Session : ISession
+    {
+        /// <summary>
+        /// Private object used for synchronization, instead of public "this"
+        /// </summary>
+        private readonly object myLock = new object();
+        private int consumerCounter;
+        private readonly IDictionary consumers = Hashtable.Synchronized(new 
Hashtable());
+        private readonly IDictionary producers = Hashtable.Synchronized(new 
Hashtable());
+        private readonly DispatchingThread dispatchingThread;
+        private DispatchingThread.ExceptionHandler 
dispatchingThread_ExceptionHandler;
+        private readonly SessionInfo info;
+        private int producerCounter;
+        internal bool startedAsyncDelivery = false;
+        private bool disposed = false;
+        private bool closed = false;
+        private bool closing = false;
+        private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000);
+
+        public Session(Connection connection, SessionInfo info, 
AcknowledgementMode acknowledgementMode)
+        {
+            this.connection = connection;
+            this.info = info;
+            this.acknowledgementMode = acknowledgementMode;
+            this.AsyncSend = connection.AsyncSend;
+            this.requestTimeout = connection.RequestTimeout;
+            this.PrefetchSize = 1000;
+            this.transactionContext = new TransactionContext(this);
+            this.dispatchingThread = new DispatchingThread(new 
DispatchingThread.DispatchFunction(DispatchAsyncMessages));
+            this.dispatchingThread_ExceptionHandler = new 
DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener);
+        }
+
+        ~Session()
+        {
+            Dispose(false);
+        }
+
+        /// <summary>
+        /// Sets the prefetch size, the maximum number of messages a broker 
will dispatch to consumers
+        /// until acknowledgements are received.
+        /// </summary>
+        public int PrefetchSize;
+
+        /// <summary>
+        /// Sets the maximum number of messages to keep around per consumer
+        /// in addition to the prefetch window for non-durable topics until 
messages
+        /// will start to be evicted for slow consumers.
+        /// Must be > 0 to enable this feature
+        /// </summary>
+        public int MaximumPendingMessageLimit;
+
+        /// <summary>
+        /// Enables or disables whether asynchronous dispatch should be used 
by the broker
+        /// </summary>
+        public bool DispatchAsync;
+
+        /// <summary>
+        /// Enables or disables exclusive consumers when using queues. An 
exclusive consumer means
+        /// only one instance of a consumer is allowed to process messages on 
a queue to preserve order
+        /// </summary>
+        public bool Exclusive;
+
+        /// <summary>
+        /// Enables or disables retroactive mode for consumers; i.e. do they 
go back in time or not?
+        /// </summary>
+        public bool Retroactive;
+
+        /// <summary>
+        /// Sets the default consumer priority for consumers
+        /// </summary>
+        public byte Priority;
+
+        /// <summary>
+        /// This property indicates whether or not async send is enabled.
+        /// </summary>
+        public bool AsyncSend;
+
+        private Connection connection;
+        public Connection Connection
+        {
+            get { return this.connection; }
+        }
+
+        public SessionId SessionId
+        {
+            get { return info.SessionId; }
+        }
+
+        private TransactionContext transactionContext;
+        public TransactionContext TransactionContext
+        {
+            get { return this.transactionContext; }
+        }
+
+        #region ISession Members
+
+        public void Dispose()
+        {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        protected void Dispose(bool disposing)
+        {
+            if(this.disposed)
+            {
+                return;
+            }
+
+            if(disposing)
+            {
+                // Dispose managed code here.
+            }
+
+            try
+            {
+                Close();
+            }
+            catch
+            {
+                // Ignore network errors.
+            }
+
+            this.disposed = true;
+        }
+
+        public void Close()
+        {
+            lock(myLock)
+            {
+                if(this.closed)
+                {
+                    return;
+                }
+
+                try
+                {
+                    this.closing = true;
+                    StopAsyncDelivery();
+                    lock(consumers.SyncRoot)
+                    {
+                        foreach(MessageConsumer consumer in consumers.Values)
+                        {
+                            consumer.Close();
+                        }
+                    }
+                    consumers.Clear();
+
+                    lock(producers.SyncRoot)
+                    {
+                        foreach(MessageProducer producer in producers.Values)
+                        {
+                            producer.Close();
+                        }
+                    }
+                    producers.Clear();
                     Connection.RemoveSession(this);
-                               }
-                               catch(Exception ex)
-                               {
-                                       Tracer.ErrorFormat("Error during 
session close: {0}", ex);
-                               }
-                               finally
-                               {
-                                       this.connection = null;
-                                       this.closed = true;
-                                       this.closing = false;
-                               }
-                       }
-               }
-
-               public IMessageProducer CreateProducer()
-               {
-                       return CreateProducer(null);
-               }
-
-               public IMessageProducer CreateProducer(IDestination destination)
-               {
-                       ProducerInfo command = CreateProducerInfo(destination);
-                       ProducerId producerId = command.ProducerId;
-                       MessageProducer producer = null;
-
-                       try
-                       {
-                               producer = new MessageProducer(this, command);
-                               producers[producerId] = producer;
-                               this.DoSend(command);
-                       }
-                       catch(Exception)
-                       {
-                               if(producer != null)
-                               {
-                                       producer.Close();
-                               }
-
-                               throw;
-                       }
-
-                       return producer;
-               }
-
-               public IMessageConsumer CreateConsumer(IDestination destination)
-               {
-                       return CreateConsumer(destination, null, false);
-               }
-
-               public IMessageConsumer CreateConsumer(IDestination 
destination, string selector)
-               {
-                       return CreateConsumer(destination, selector, false);
-               }
+                }
+                catch(Exception ex)
+                {
+                    Tracer.ErrorFormat("Error during session close: {0}", ex);
+                }
+                finally
+                {
+                    this.connection = null;
+                    this.closed = true;
+                    this.closing = false;
+                }
+            }
+        }
+
+        public IMessageProducer CreateProducer()
+        {
+            return CreateProducer(null);
+        }
+
+        public IMessageProducer CreateProducer(IDestination destination)
+        {
+            ProducerInfo command = CreateProducerInfo(destination);
+            ProducerId producerId = command.ProducerId;
+            MessageProducer producer = null;
+
+            try
+            {
+                producer = new MessageProducer(this, command);
+                producers[producerId] = producer;
+                this.DoSend(command);
+            }
+            catch(Exception)
+            {
+                if(producer != null)
+                {
+                    producer.Close();
+                }
+
+                throw;
+            }
+
+            return producer;
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination)
+        {
+            return CreateConsumer(destination, null, false);
+        }
+
+        public IMessageConsumer CreateConsumer(IDestination destination, 
string selector)
+        {
+            return CreateConsumer(destination, selector, false);
+        }
 
-               public IMessageConsumer CreateConsumer(IDestination 
destination, string selector, bool noLocal)
-               {
+        public IMessageConsumer CreateConsumer(IDestination destination, 
string selector, bool noLocal)
+        {
             if (destination == null)
             {
                 throw new InvalidDestinationException("Cannot create a 
Consumer with a Null destination");
             }
 
-                       ConsumerInfo command = CreateConsumerInfo(destination, 
selector);
-                       command.NoLocal = noLocal;
-                       command.AcknowledgementMode = this.AcknowledgementMode;
-
-                       ConsumerId consumerId = command.ConsumerId;
-                       MessageConsumer consumer = null;
-
-                       try
-                       {
-                               consumer = new MessageConsumer(this, command, 
this.AcknowledgementMode);
-                               // lets register the consumer first in case we 
start dispatching messages immediately
-                               consumers[consumerId] = consumer;
-                               this.DoSend(command);
-                               return consumer;
-                       }
-                       catch(Exception)
-                       {
-                               if(consumer != null)
-                               {
-                                       consumer.Close();
-                               }
-
-                               throw;
-                       }
-               }
+            ConsumerInfo command = CreateConsumerInfo(destination, selector);
+            command.NoLocal = noLocal;
+            ConsumerId consumerId = command.ConsumerId;
+            MessageConsumer consumer = null;
+
+            try
+            {
+                consumer = new MessageConsumer(this, command, 
this.AcknowledgementMode);
+                // lets register the consumer first in case we start 
dispatching messages immediately
+                consumers[consumerId] = consumer;
+                this.DoSend(command);
+                return consumer;
+            }
+            catch(Exception)
+            {
+                if(consumer != null)
+                {
+                    consumer.Close();
+                }
 
-               public IMessageConsumer CreateDurableConsumer(ITopic 
destination, string name, string selector, bool noLocal)
-               {
+                throw;
+            }
+        }
+
+        public IMessageConsumer CreateDurableConsumer(ITopic destination, 
string name, string selector, bool noLocal)
+        {
             if (destination == null)
             {
                 throw new InvalidDestinationException("Cannot create a 
Consumer with a Null destination");
             }
-            
+
             ConsumerInfo command = CreateConsumerInfo(destination, selector);
-                       ConsumerId consumerId = command.ConsumerId;
-                       command.SubscriptionName = name;
-                       command.NoLocal = noLocal;
-                       MessageConsumer consumer = null;
-
-                       try
-                       {
-                               consumer = new MessageConsumer(this, command, 
this.AcknowledgementMode);
-                               // lets register the consumer first in case we 
start dispatching messages immediately
-                               consumers[consumerId] = consumer;
-                               this.DoSend(command);
-                       }
-                       catch(Exception)
-                       {
-                               if(consumer != null)
-                               {
-                                       consumer.Close();
-                               }
-
-                               throw;
-                       }
-
-                       return consumer;
-               }
-
-               public void DeleteDurableConsumer(string name)
-               {
-                       RemoveSubscriptionInfo command = new 
RemoveSubscriptionInfo();
-                       command.ConnectionId = Connection.ConnectionId;
-                       command.ClientId = Connection.ClientId;
-                       command.SubcriptionName = name;
-                       this.DoSend(command);
-               }
-
-               public IQueue GetQueue(string name)
-               {
-                       return new ActiveMQQueue(name);
-               }
-
-               public ITopic GetTopic(string name)
-               {
-                       return new ActiveMQTopic(name);
-               }
-
-               public ITemporaryQueue CreateTemporaryQueue()
-               {
-                       ActiveMQTempQueue answer = new 
ActiveMQTempQueue(Connection.CreateTemporaryDestinationName());
-                       CreateTemporaryDestination(answer);
-                       return answer;
-               }
-
-               public ITemporaryTopic CreateTemporaryTopic()
-               {
-                       ActiveMQTempTopic answer = new 
ActiveMQTempTopic(Connection.CreateTemporaryDestinationName());
-                       CreateTemporaryDestination(answer);
-                       return answer;
-               }
-
-               /// <summary>
-               /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
-               /// </summary>
-               public void DeleteDestination(IDestination destination)
-               {
-                       DestinationInfo command = new DestinationInfo();
-                       command.ConnectionId = Connection.ConnectionId;
-                       command.OperationType = 
DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove
-                       command.Destination = destination;
-
-                       this.DoSend(command);
-               }
-
-               public IMessage CreateMessage()
-               {
-                       ActiveMQMessage answer = new ActiveMQMessage();
-                       Configure(answer);
-                       return answer;
-               }
-
-               public ITextMessage CreateTextMessage()
-               {
-                       ActiveMQTextMessage answer = new ActiveMQTextMessage();
-                       Configure(answer);
-                       return answer;
-               }
-
-               public ITextMessage CreateTextMessage(string text)
-               {
-                       ActiveMQTextMessage answer = new 
ActiveMQTextMessage(text);
-                       Configure(answer);
-                       return answer;
-               }
-
-               public IMapMessage CreateMapMessage()
-               {
-                       return new ActiveMQMapMessage();
-               }
-
-               public IBytesMessage CreateBytesMessage()
-               {
-                       return new ActiveMQBytesMessage();
-               }
-
-               public IBytesMessage CreateBytesMessage(byte[] body)
-               {
-                       ActiveMQBytesMessage answer = new 
ActiveMQBytesMessage();
-                       answer.Content = body;
-                       return answer;
-               }
-
-               public IObjectMessage CreateObjectMessage(object body)
-               {
-                       ActiveMQObjectMessage answer = new 
ActiveMQObjectMessage();
-                       answer.Body = body;
-                       return answer;
-               }
-
-               public void Commit()
-               {
-                       if(!Transacted)
-                       {
-                               throw new InvalidOperationException(
-                                               "You cannot perform a Commit() 
on a non-transacted session. Acknowlegement mode is: "
-                                               + this.AcknowledgementMode);
-                       }
-                       this.TransactionContext.Commit();
-               }
-
-               public void Rollback()
-               {
-                       if(!Transacted)
-                       {
-                               throw new InvalidOperationException(
-                                               "You cannot perform a Commit() 
on a non-transacted session. Acknowlegement mode is: "
-                                               + this.AcknowledgementMode);
-                       }
-                       this.TransactionContext.Rollback();
-
-                       // lets ensure all the consumers redeliver any rolled 
back messages
-                       lock(consumers.SyncRoot)
-                       {
-                               foreach(MessageConsumer consumer in 
consumers.Values)
-                               {
-                                       consumer.RedeliverRolledBackMessages();
-                               }
-                       }
-               }
-
-
-               // Properties
-
-               private TimeSpan requestTimeout = 
Apache.NMS.NMSConstants.defaultRequestTimeout;
-               public TimeSpan RequestTimeout
-               {
-                       get { return this.requestTimeout; }
-                       set { this.requestTimeout = value; }
-               }
-
-               public bool Transacted
-               {
-                       get { return this.AcknowledgementMode == 
AcknowledgementMode.Transactional; }
-               }
-
-               private AcknowledgementMode acknowledgementMode;
-               public AcknowledgementMode AcknowledgementMode
-               {
-                       get { return this.acknowledgementMode; }
-               }
-
-               #endregion
-
-               private void dispatchingThread_ExceptionListener(Exception 
exception)
-               {
-                       if(null != Connection)
-                       {
-                               try
-                               {
-                                       Connection.OnSessionException(this, 
exception);
-                               }
-                               catch
-                               {
-                               }
-                       }
-               }
-
-               protected void CreateTemporaryDestination(ActiveMQDestination 
tempDestination)
-               {
-                       DestinationInfo command = new DestinationInfo();
-                       command.ConnectionId = Connection.ConnectionId;
-                       command.OperationType = 
DestinationInfo.ADD_OPERATION_TYPE; // 0 is add
-                       command.Destination = tempDestination;
-
-                       this.DoSend(command);
-               }
-
-               public void DoSend(Command message)
-               {
-                       this.DoSend(message, this.RequestTimeout);
-               }
-
-               public void DoSend(Command message, TimeSpan requestTimeout)
-               {
-                       if(AsyncSend)
-                       {
-                               Connection.OneWay(message);
-                       }
-                       else
-                       {
-                               Connection.SyncRequest(message, requestTimeout);
-                       }
-               }
-
-               /// <summary>
-               /// Ensures that a transaction is started
-               /// </summary>
-               public void DoStartTransaction()
-               {
-                       if(Transacted)
-                       {
-                               this.TransactionContext.Begin();
-                       }
-               }
-
-               public void DisposeOf(ConsumerId objectId)
-               {
-                       Connection.DisposeOf(objectId);
-                       if(!this.closing)
-                       {
-                               consumers.Remove(objectId);
-                       }
-               }
-
-               public void DisposeOf(ProducerId objectId)
-               {
-                       Connection.DisposeOf(objectId);
-                       if(!this.closing)
-                       {
-                               producers.Remove(objectId);
-                       }
-               }
-
-               public bool DispatchMessage(ConsumerId consumerId, Message 
message)
-               {
-                       bool dispatched = false;
-                       MessageConsumer consumer = (MessageConsumer) 
consumers[consumerId];
-
-                       if(consumer != null)
-                       {
-                               consumer.Dispatch((ActiveMQMessage) message);
-                               dispatched = true;
-                       }
-
-                       return dispatched;
-               }
-
-               /// <summary>
-               /// Private method called by the dispatcher thread in order to 
perform
-               /// asynchronous delivery of queued (inbound) messages.
-               /// </summary>
-               private void DispatchAsyncMessages()
-               {
-                       // lets iterate through each consumer created by this 
session
-                       // ensuring that they have all pending messages 
dispatched
-                       lock(consumers.SyncRoot)
-                       {
-                               foreach(MessageConsumer consumer in 
consumers.Values)
-                               {
-                                       consumer.DispatchAsyncMessages();
-                               }
-                       }
-               }
-
-               protected virtual ConsumerInfo CreateConsumerInfo(IDestination 
destination, string selector)
-               {
-                       ConsumerInfo answer = new ConsumerInfo();
-                       ConsumerId id = new ConsumerId();
-                       id.ConnectionId = info.SessionId.ConnectionId;
-                       id.SessionId = info.SessionId.Value;
-                       id.Value = Interlocked.Increment(ref consumerCounter);
-                       answer.ConsumerId = id;
-                       answer.Destination = 
ActiveMQDestination.Transform(destination);
-                       answer.Selector = selector;
-                       answer.PrefetchSize = this.PrefetchSize;
-                       answer.Priority = this.Priority;
-                       answer.Exclusive = this.Exclusive;
-                       answer.DispatchAsync = this.DispatchAsync;
-                       answer.Retroactive = this.Retroactive;
-
-                       // If the destination contained a URI query, then use 
it to set public properties
-                       // on the ConsumerInfo
-                       ActiveMQDestination amqDestination = destination as 
ActiveMQDestination;
-                       if(amqDestination != null && amqDestination.Options != 
null)
-                       {
-                               URISupport.SetProperties(answer, 
amqDestination.Options, "consumer.");
-                       }
-
-                       return answer;
-               }
-
-               protected virtual ProducerInfo CreateProducerInfo(IDestination 
destination)
-               {
-                       ProducerInfo answer = new ProducerInfo();
-                       ProducerId id = new ProducerId();
-                       id.ConnectionId = info.SessionId.ConnectionId;
-                       id.SessionId = info.SessionId.Value;
-                       id.Value = Interlocked.Increment(ref producerCounter);
-                       answer.ProducerId = id;
-                       answer.Destination = 
ActiveMQDestination.Transform(destination);
-
-                       // If the destination contained a URI query, then use 
it to set public
-                       // properties on the ProducerInfo
-                       ActiveMQDestination amqDestination = destination as 
ActiveMQDestination;
-                       if(amqDestination != null && amqDestination.Options != 
null)
-                       {
-                               URISupport.SetProperties(answer, 
amqDestination.Options, "producer.");
-                       }
-
-                       return answer;
-               }
-
-               /// <summary>
-               /// Configures the message command
-               /// </summary>
-               protected void Configure(ActiveMQMessage message)
-               {
-               }
-
-               internal void StopAsyncDelivery()
-               {
-                       if(startedAsyncDelivery)
-                       {
-                               this.dispatchingThread.ExceptionListener -= 
this.dispatchingThread_ExceptionHandler;
-                               dispatchingThread.Stop((int) 
MAX_THREAD_WAIT.TotalMilliseconds);
-                               startedAsyncDelivery = false;
-                       }
-               }
-
-               internal void StartAsyncDelivery()
-               {
-                       if(!startedAsyncDelivery)
-                       {
-                               this.dispatchingThread.ExceptionListener += 
this.dispatchingThread_ExceptionHandler;
-                               dispatchingThread.Start();
-                               startedAsyncDelivery = true;
-                       }
-               }
-
-               internal void RegisterConsumerDispatcher(Dispatcher dispatcher)
-               {
-                       
dispatcher.SetAsyncDelivery(this.dispatchingThread.EventHandle);
-               }
-       }
+            ConsumerId consumerId = command.ConsumerId;
+            command.SubscriptionName = name;
+            command.NoLocal = noLocal;
+            MessageConsumer consumer = null;
+
+            try
+            {
+                consumer = new MessageConsumer(this, command, 
this.AcknowledgementMode);
+                // lets register the consumer first in case we start 
dispatching messages immediately
+                consumers[consumerId] = consumer;
+                this.DoSend(command);
+            }
+            catch(Exception)
+            {
+                if(consumer != null)
+                {
+                    consumer.Close();
+                }
+
+                throw;
+            }
+
+            return consumer;
+        }
+
+        public void DeleteDurableConsumer(string name)
+        {
+            RemoveSubscriptionInfo command = new RemoveSubscriptionInfo();
+            command.ConnectionId = Connection.ConnectionId;
+            command.ClientId = Connection.ClientId;
+            command.SubcriptionName = name;
+            this.DoSend(command);
+        }
+
+        public IQueue GetQueue(string name)
+        {
+            return new ActiveMQQueue(name);
+        }
+
+        public ITopic GetTopic(string name)
+        {
+            return new ActiveMQTopic(name);
+        }
+
+        public ITemporaryQueue CreateTemporaryQueue()
+        {
+            ActiveMQTempQueue answer = new 
ActiveMQTempQueue(Connection.CreateTemporaryDestinationName());
+            CreateTemporaryDestination(answer);
+            return answer;
+        }
+
+        public ITemporaryTopic CreateTemporaryTopic()
+        {
+            ActiveMQTempTopic answer = new 
ActiveMQTempTopic(Connection.CreateTemporaryDestinationName());
+            CreateTemporaryDestination(answer);
+            return answer;
+        }
+
+        /// <summary>
+        /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
+        /// </summary>
+        public void DeleteDestination(IDestination destination)
+        {
+            DestinationInfo command = new DestinationInfo();
+            command.ConnectionId = Connection.ConnectionId;
+            command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 
1 is remove
+            command.Destination = (ActiveMQDestination) destination;
+
+            this.DoSend(command);
+        }
+
+        public IMessage CreateMessage()
+        {
+            ActiveMQMessage answer = new ActiveMQMessage();
+            Configure(answer);
+            return answer;
+        }
+
+        public ITextMessage CreateTextMessage()
+        {
+            ActiveMQTextMessage answer = new ActiveMQTextMessage();
+            Configure(answer);
+            return answer;
+        }
+
+        public ITextMessage CreateTextMessage(string text)
+        {
+            ActiveMQTextMessage answer = new ActiveMQTextMessage(text);
+            Configure(answer);
+            return answer;
+        }
+
+        public IMapMessage CreateMapMessage()
+        {
+            return new ActiveMQMapMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage()
+        {
+            return new ActiveMQBytesMessage();
+        }
+
+        public IBytesMessage CreateBytesMessage(byte[] body)
+        {
+            ActiveMQBytesMessage answer = new ActiveMQBytesMessage();
+            answer.Content = body;
+            return answer;
+        }
+
+        public IObjectMessage CreateObjectMessage(object body)
+        {
+            ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
+            answer.Body = body;
+            return answer;
+        }
+
+        public void Commit()
+        {
+            if(!Transacted)
+            {
+                throw new InvalidOperationException(
+                        "You cannot perform a Commit() on a non-transacted 
session. Acknowlegement mode is: "
+                        + this.AcknowledgementMode);
+            }
+            this.TransactionContext.Commit();
+        }
+
+        public void Rollback()
+        {
+            if(!Transacted)
+            {
+                throw new InvalidOperationException(
+                        "You cannot perform a Commit() on a non-transacted 
session. Acknowlegement mode is: "
+                        + this.AcknowledgementMode);
+            }
+            this.TransactionContext.Rollback();
+
+            // lets ensure all the consumers redeliver any rolled back messages
+            lock(consumers.SyncRoot)
+            {
+                foreach(MessageConsumer consumer in consumers.Values)
+                {
+                    consumer.RedeliverRolledBackMessages();
+                }
+            }
+        }
+
+
+        // Properties
+
+        private TimeSpan requestTimeout = 
Apache.NMS.NMSConstants.defaultRequestTimeout;
+        public TimeSpan RequestTimeout
+        {
+            get { return this.requestTimeout; }
+            set { this.requestTimeout = value; }
+        }
+
+        public bool Transacted
+        {
+            get { return this.AcknowledgementMode == 
AcknowledgementMode.Transactional; }
+        }
+
+        private AcknowledgementMode acknowledgementMode;
+        public AcknowledgementMode AcknowledgementMode
+        {
+            get { return this.acknowledgementMode; }
+        }
+
+        #endregion
+
+        private void dispatchingThread_ExceptionListener(Exception exception)
+        {
+            if(null != Connection)
+            {
+                try
+                {
+                    Connection.OnSessionException(this, exception);
+                }
+                catch
+                {
+                }
+            }
+        }
+
+        protected void CreateTemporaryDestination(ActiveMQDestination 
tempDestination)
+        {
+            DestinationInfo command = new DestinationInfo();
+            command.ConnectionId = Connection.ConnectionId;
+            command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 
is add
+            command.Destination = tempDestination;
+
+            this.DoSend(command);
+        }
+
+        public void DoSend(Command message)
+        {
+            this.DoSend(message, this.RequestTimeout);
+        }
+
+        public void DoSend(Command message, TimeSpan requestTimeout)
+        {
+            if(AsyncSend)
+            {
+                Connection.OneWay(message);
+            }
+            else
+            {
+                Connection.SyncRequest(message, requestTimeout);
+            }
+        }
+
+        /// <summary>
+        /// Ensures that a transaction is started
+        /// </summary>
+        public void DoStartTransaction()
+        {
+            if(Transacted)
+            {
+                this.TransactionContext.Begin();
+            }
+        }
+
+        public void DisposeOf(ConsumerId objectId)
+        {
+            Connection.DisposeOf(objectId);
+            if(!this.closing)
+            {
+                consumers.Remove(objectId);
+            }
+        }
+
+        public void DisposeOf(ProducerId objectId)
+        {
+            Connection.DisposeOf(objectId);
+            if(!this.closing)
+            {
+                producers.Remove(objectId);
+            }
+        }
+
+        public bool DispatchMessage(ConsumerId consumerId, Message message)
+        {
+            bool dispatched = false;
+            MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
+
+            if(consumer != null)
+            {
+                consumer.Dispatch((ActiveMQMessage) message);
+                dispatched = true;
+            }
+
+            return dispatched;
+        }
+
+        /// <summary>
+        /// Private method called by the dispatcher thread in order to perform
+        /// asynchronous delivery of queued (inbound) messages.
+        /// </summary>
+        private void DispatchAsyncMessages()
+        {
+            // lets iterate through each consumer created by this session
+            // ensuring that they have all pending messages dispatched
+            lock(consumers.SyncRoot)
+            {
+                foreach(MessageConsumer consumer in consumers.Values)
+                {
+                    consumer.DispatchAsyncMessages();
+                }
+            }
+        }
+
+        protected virtual ConsumerInfo CreateConsumerInfo(IDestination 
destination, string selector)
+        {
+            ConsumerInfo answer = new ConsumerInfo();
+            ConsumerId id = new ConsumerId();
+            id.ConnectionId = info.SessionId.ConnectionId;
+            id.SessionId = info.SessionId.Value;
+            id.Value = Interlocked.Increment(ref consumerCounter);
+            answer.ConsumerId = id;
+            answer.Destination = ActiveMQDestination.Transform(destination);
+            answer.Selector = selector;
+            answer.PrefetchSize = this.PrefetchSize;
+            answer.Priority = this.Priority;
+            answer.Exclusive = this.Exclusive;
+            answer.DispatchAsync = this.DispatchAsync;
+            answer.Retroactive = this.Retroactive;
+
+            // If the destination contained a URI query, then use it to set 
public properties
+            // on the ConsumerInfo
+            ActiveMQDestination amqDestination = destination as 
ActiveMQDestination;
+            if(amqDestination != null && amqDestination.Options != null)
+            {
+                URISupport.SetProperties(answer, amqDestination.Options, 
"consumer.");
+            }
+
+            return answer;
+        }
+
+        protected virtual ProducerInfo CreateProducerInfo(IDestination 
destination)
+        {
+            ProducerInfo answer = new ProducerInfo();
+            ProducerId id = new ProducerId();
+            id.ConnectionId = info.SessionId.ConnectionId;
+            id.SessionId = info.SessionId.Value;
+            id.Value = Interlocked.Increment(ref producerCounter);
+            answer.ProducerId = id;
+            answer.Destination = ActiveMQDestination.Transform(destination);
+
+            // If the destination contained a URI query, then use it to set 
public
+            // properties on the ProducerInfo
+            ActiveMQDestination amqDestination = destination as 
ActiveMQDestination;
+            if(amqDestination != null && amqDestination.Options != null)
+            {
+                URISupport.SetProperties(answer, amqDestination.Options, 
"producer.");
+            }
+
+            return answer;
+        }
+
+        /// <summary>
+        /// Configures the message command
+        /// </summary>
+        protected void Configure(ActiveMQMessage message)
+        {
+        }
+
+        internal void StopAsyncDelivery()
+        {
+            if(startedAsyncDelivery)
+            {
+                this.dispatchingThread.ExceptionListener -= 
this.dispatchingThread_ExceptionHandler;
+                dispatchingThread.Stop((int) 
MAX_THREAD_WAIT.TotalMilliseconds);
+                startedAsyncDelivery = false;
+            }
+        }
+
+        internal void StartAsyncDelivery()
+        {
+            if(!startedAsyncDelivery)
+            {
+                this.dispatchingThread.ExceptionListener += 
this.dispatchingThread_ExceptionHandler;
+                dispatchingThread.Start();
+                startedAsyncDelivery = true;
+            }
+        }
+
+        internal void RegisterConsumerDispatcher(Dispatcher dispatcher)
+        {
+            dispatcher.SetAsyncDelivery(this.dispatchingThread.EventHandle);
+        }
+    }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs?rev=799407&r1=799406&r2=799407&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs
 Thu Jul 30 19:06:34 2009
@@ -19,178 +19,188 @@
 
 namespace Apache.NMS.ActiveMQ.State
 {
-       public class CommandVisitorAdapter : ICommandVisitor
-       {
+    public class CommandVisitorAdapter : ICommandVisitor
+    {
 
-               public virtual Response processAddConnection(ConnectionInfo 
info)
-               {
-                       return null;
-               }
-
-               public virtual Response processAddConsumer(ConsumerInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response processAddDestination(DestinationInfo 
info)
-               {
-                       return null;
-               }
-
-               public virtual Response processAddProducer(ProducerInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response processAddSession(SessionInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response processBeginTransaction(TransactionInfo 
info)
-               {
-                       return null;
-               }
-
-               public virtual Response processBrokerInfo(BrokerInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response 
processCommitTransactionOnePhase(TransactionInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response 
processCommitTransactionTwoPhase(TransactionInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response processEndTransaction(TransactionInfo 
info)
-               {
-                       return null;
-               }
-
-               public virtual Response processFlush(FlushCommand command)
-               {
-                       return null;
-               }
-
-               public virtual Response 
processForgetTransaction(TransactionInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response processKeepAlive(KeepAliveInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response processMessage(Message send)
-               {
-                       return null;
-               }
-
-               public virtual Response processMessageAck(MessageAck ack)
-               {
-                       return null;
-               }
-
-               public virtual Response 
processMessageDispatchNotification(MessageDispatchNotification notification)
-               {
-                       return null;
-               }
-
-               public virtual Response processMessagePull(MessagePull pull)
-               {
-                       return null;
-               }
-
-               public virtual Response 
processPrepareTransaction(TransactionInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response processProducerAck(ProducerAck ack)
-               {
-                       return null;
-               }
-
-               public virtual Response 
processRecoverTransactions(TransactionInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response processRemoveConnection(ConnectionId id)
-               {
-                       return null;
-               }
-
-               public virtual Response processRemoveConsumer(ConsumerId id)
-               {
-                       return null;
-               }
-
-               public virtual Response 
processRemoveDestination(DestinationInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response processRemoveProducer(ProducerId id)
-               {
-                       return null;
-               }
-
-               public virtual Response processRemoveSession(SessionId id)
-               {
-                       return null;
-               }
-
-               public virtual Response 
processRemoveSubscription(RemoveSubscriptionInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response 
processRollbackTransaction(TransactionInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response processShutdown(ShutdownInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response processWireFormat(WireFormatInfo info)
-               {
-                       return null;
-               }
-
-               public virtual Response processMessageDispatch(MessageDispatch 
dispatch)
-               {
-                       return null;
-               }
-
-               public virtual Response processControlCommand(ControlCommand 
command)
-               {
-                       return null;
-               }
-
-               public virtual Response 
processConnectionControl(ConnectionControl control)
-               {
-                       return null;
-               }
-
-               public virtual Response processConnectionError(ConnectionError 
error)
-               {
-                       return null;
-               }
-
-               public virtual Response processConsumerControl(ConsumerControl 
control)
-               {
-                       return null;
-               }
+        public virtual Response processAddConnection(ConnectionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processAddConsumer(ConsumerInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processAddDestination(DestinationInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processAddProducer(ProducerInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processAddSession(SessionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processBeginTransaction(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processBrokerInfo(BrokerInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response 
processCommitTransactionOnePhase(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response 
processCommitTransactionTwoPhase(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processEndTransaction(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processFlushCommand(FlushCommand command)
+        {
+            return null;
+        }
+
+        public virtual Response processForgetTransaction(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processKeepAliveInfo(KeepAliveInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processMessage(Message send)
+        {
+            return null;
+        }
+
+        public virtual Response processMessageAck(MessageAck ack)
+        {
+            return null;
+        }
+
+        public virtual Response 
processMessageDispatchNotification(MessageDispatchNotification notification)
+        {
+            return null;
+        }
+
+        public virtual Response processMessagePull(MessagePull pull)
+        {
+            return null;
+        }
+
+        public virtual Response processPrepareTransaction(TransactionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processProducerAck(ProducerAck ack)
+        {
+            return null;
+        }
+
+        public virtual Response processRecoverTransactions(TransactionInfo 
info)
+        {
+            return null;
+        }
+
+        public virtual Response processRemoveConnection(ConnectionId id)
+        {
+            return null;
+        }
+
+        public virtual Response processRemoveConsumer(ConsumerId id)
+        {
+            return null;
+        }
+
+        public virtual Response processRemoveDestination(DestinationInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processRemoveProducer(ProducerId id)
+        {
+            return null;
+        }
+
+        public virtual Response processRemoveSession(SessionId id)
+        {
+            return null;
+        }
+
+        public virtual Response 
processRemoveSubscriptionInfo(RemoveSubscriptionInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processRollbackTransaction(TransactionInfo 
info)
+        {
+            return null;
+        }
+
+        public virtual Response processShutdownInfo(ShutdownInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processWireFormat(WireFormatInfo info)
+        {
+            return null;
+        }
+
+        public virtual Response processMessageDispatch(MessageDispatch 
dispatch)
+        {
+            return null;
+        }
+
+        public virtual Response processControlCommand(ControlCommand command)
+        {
+            return null;
+        }
+
+        public virtual Response processConnectionControl(ConnectionControl 
control)
+        {
+            return null;
+        }
+
+        public virtual Response processConnectionError(ConnectionError error)
+        {
+            return null;
+        }
+
+        public virtual Response processConsumerControl(ConsumerControl control)
+        {
+            return null;
+        }
+
+        public virtual Response processResponse(Response response)
+        {
+            return null;
+        }
+
+        public virtual Response processReplayCommand(ReplayCommand 
replayCommand)
+        {
+           return null;
+        }
 
-       }
+    }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs?rev=799407&r1=799406&r2=799407&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs
 Thu Jul 30 19:06:34 2009
@@ -20,76 +20,80 @@
 
 namespace Apache.NMS.ActiveMQ.State
 {
-       public interface ICommandVisitor
-       {
+    public interface ICommandVisitor
+    {
 
-               Response processAddConnection(ConnectionInfo info);
+        Response processAddConnection(ConnectionInfo info);
 
-               Response processAddSession(SessionInfo info);
+        Response processAddSession(SessionInfo info);
 
-               Response processAddProducer(ProducerInfo info);
+        Response processAddProducer(ProducerInfo info);
 
-               Response processAddConsumer(ConsumerInfo info);
+        Response processAddConsumer(ConsumerInfo info);
 
-               Response processRemoveConnection(ConnectionId id);
+        Response processRemoveConnection(ConnectionId id);
 
-               Response processRemoveSession(SessionId id);
+        Response processRemoveSession(SessionId id);
 
-               Response processRemoveProducer(ProducerId id);
+        Response processRemoveProducer(ProducerId id);
 
-               Response processRemoveConsumer(ConsumerId id);
+        Response processRemoveConsumer(ConsumerId id);
 
-               Response processAddDestination(DestinationInfo info);
+        Response processAddDestination(DestinationInfo info);
 
-               Response processRemoveDestination(DestinationInfo info);
+        Response processRemoveDestination(DestinationInfo info);
 
-               Response processRemoveSubscription(RemoveSubscriptionInfo info);
+        Response processRemoveSubscriptionInfo(RemoveSubscriptionInfo info);
 
-               Response processMessage(Message send);
+        Response processMessage(Message send);
 
-               Response processMessageAck(MessageAck ack);
+        Response processMessageAck(MessageAck ack);
 
-               Response processMessagePull(MessagePull pull);
+        Response processMessagePull(MessagePull pull);
 
-               Response processBeginTransaction(TransactionInfo info);
+        Response processBeginTransaction(TransactionInfo info);
 
-               Response processPrepareTransaction(TransactionInfo info);
+        Response processPrepareTransaction(TransactionInfo info);
 
-               Response processCommitTransactionOnePhase(TransactionInfo info);
+        Response processCommitTransactionOnePhase(TransactionInfo info);
 
-               Response processCommitTransactionTwoPhase(TransactionInfo info);
+        Response processCommitTransactionTwoPhase(TransactionInfo info);
 
-               Response processRollbackTransaction(TransactionInfo info);
+        Response processRollbackTransaction(TransactionInfo info);
 
-               Response processWireFormat(WireFormatInfo info);
+        Response processWireFormat(WireFormatInfo info);
 
-               Response processKeepAlive(KeepAliveInfo info);
+        Response processKeepAliveInfo(KeepAliveInfo info);
 
-               Response processShutdown(ShutdownInfo info);
+        Response processShutdownInfo(ShutdownInfo info);
 
-               Response processFlush(FlushCommand command);
+        Response processFlushCommand(FlushCommand command);
 
-               Response processBrokerInfo(BrokerInfo info);
+        Response processBrokerInfo(BrokerInfo info);
 
-               Response processRecoverTransactions(TransactionInfo info);
+        Response processRecoverTransactions(TransactionInfo info);
 
-               Response processForgetTransaction(TransactionInfo info);
+        Response processForgetTransaction(TransactionInfo info);
 
-               Response processEndTransaction(TransactionInfo info);
+        Response processEndTransaction(TransactionInfo info);
 
-               Response 
processMessageDispatchNotification(MessageDispatchNotification notification);
+        Response 
processMessageDispatchNotification(MessageDispatchNotification notification);
 
-               Response processProducerAck(ProducerAck ack);
+        Response processProducerAck(ProducerAck ack);
 
-               Response processMessageDispatch(MessageDispatch dispatch);
+        Response processMessageDispatch(MessageDispatch dispatch);
 
-               Response processControlCommand(ControlCommand command);
+        Response processControlCommand(ControlCommand command);
 
-               Response processConnectionError(ConnectionError error);
+        Response processConnectionError(ConnectionError error);
 
-               Response processConnectionControl(ConnectionControl control);
+        Response processConnectionControl(ConnectionControl control);
 
-               Response processConsumerControl(ConsumerControl control);
+        Response processConsumerControl(ConsumerControl control);
 
-       }
+        Response processResponse(Response response);
+
+        Response processReplayCommand(ReplayCommand replayCommand);
+
+    }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs?rev=799407&r1=799406&r2=799407&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs
 Thu Jul 30 19:06:34 2009
@@ -24,84 +24,84 @@
 
 namespace Apache.NMS.ActiveMQ.Transport
 {
-       /// <summary>
-       /// A Transport which negotiates the wire format
-       /// </summary>
-       public class WireFormatNegotiator : TransportFilter
-       {
-               private OpenWireFormat wireFormat;
-               private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15);
-
-               private AtomicBoolean firstStart=new AtomicBoolean(true);
-               private CountDownLatch readyCountDownLatch = new 
CountDownLatch(1);
-               private CountDownLatch wireInfoSentDownLatch = new 
CountDownLatch(1);
-
-               public WireFormatNegotiator(ITransport next, OpenWireFormat 
wireFormat)
-                       : base(next)
-               {
-                       this.wireFormat = wireFormat;
-               }
-
-               public override void Start()
-               {
-                       base.Start();
-                       if (firstStart.CompareAndSet(true, false))
-                       {
-                               try
-                               {
-                                       
next.Oneway(wireFormat.PreferedWireFormatInfo);
-                               }
-                               finally
-                               {
-                                       wireInfoSentDownLatch.countDown();
-                               }
-                       }
-               }
-
-               protected override void Dispose(bool disposing)
-               {
-                       base.Dispose(disposing);
-                       readyCountDownLatch.countDown();
-               }
-
-               public override void Oneway(Command command)
-               {
-                       if (!readyCountDownLatch.await(negotiateTimeout))
-                               throw new IOException("Wire format negotiation 
timeout: peer did not send his wire format.");
-                       next.Oneway(command);
-               }
-
-               protected override void OnCommand(ITransport sender, Command 
command)
-               {
-                       if ( command.GetDataStructureType() == 
WireFormatInfo.ID_WireFormatInfo )
-                       {
-                               WireFormatInfo info = (WireFormatInfo)command;
-                               try
-                               {
-                                       if (!info.Valid)
-                                       {
-                                               throw new IOException("Remote 
wire format magic is invalid");
-                                       }
-                                       
wireInfoSentDownLatch.await(negotiateTimeout);
-                                       wireFormat.renegotiateWireFormat(info);
-                               }
-                               catch (Exception e)
-                               {
-                                       OnException(this, e);
-                               }
-                               finally
-                               {
-                                       readyCountDownLatch.countDown();
-                               }
-                       }
-                       this.commandHandler(sender, command);
-               }
-
-               protected override void OnException(ITransport sender, 
Exception command)
-               {
-                       readyCountDownLatch.countDown();
-                       this.exceptionHandler(sender, command);
-               }
-       }
+    /// <summary>
+    /// A Transport which negotiates the wire format
+    /// </summary>
+    public class WireFormatNegotiator : TransportFilter
+    {
+        private OpenWireFormat wireFormat;
+        private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15);
+
+        private AtomicBoolean firstStart=new AtomicBoolean(true);
+        private CountDownLatch readyCountDownLatch = new CountDownLatch(1);
+        private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1);
+
+        public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat)
+            : base(next)
+        {
+            this.wireFormat = wireFormat;
+        }
+
+        public override void Start()
+        {
+            base.Start();
+            if (firstStart.CompareAndSet(true, false))
+            {
+                try
+                {
+                    next.Oneway(wireFormat.PreferedWireFormatInfo);
+                }
+                finally
+                {
+                    wireInfoSentDownLatch.countDown();
+                }
+            }
+        }
+
+        protected override void Dispose(bool disposing)
+        {
+            base.Dispose(disposing);
+            readyCountDownLatch.countDown();
+        }
+
+        public override void Oneway(Command command)
+        {
+            if (!readyCountDownLatch.await(negotiateTimeout))
+                throw new IOException("Wire format negotiation timeout: peer 
did not send his wire format.");
+            next.Oneway(command);
+        }
+
+        protected override void OnCommand(ITransport sender, Command command)
+        {
+            if ( command.IsWireFormatInfo )
+            {
+                WireFormatInfo info = (WireFormatInfo)command;
+                try
+                {
+                    if (!info.Valid)
+                    {
+                        throw new IOException("Remote wire format magic is 
invalid");
+                    }
+                    wireInfoSentDownLatch.await(negotiateTimeout);
+                    wireFormat.renegotiateWireFormat(info);
+                }
+                catch (Exception e)
+                {
+                    OnException(this, e);
+                }
+                finally
+                {
+                    readyCountDownLatch.countDown();
+                }
+            }
+            this.commandHandler(sender, command);
+        }
+
+        protected override void OnException(ITransport sender, Exception 
command)
+        {
+            readyCountDownLatch.countDown();
+            this.exceptionHandler(sender, command);
+        }
+    }
 }
 


Reply via email to