Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs?rev=799407&r1=799406&r2=799407&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Session.cs Thu Jul 30 19:06:34 2009 @@ -23,610 +23,608 @@ namespace Apache.NMS.ActiveMQ { - /// <summary> - /// Default provider of ISession - /// </summary> - public class Session : ISession - { - /// <summary> - /// Private object used for synchronization, instead of public "this" - /// </summary> - private readonly object myLock = new object(); - private int consumerCounter; - private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable()); - private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable()); - private readonly DispatchingThread dispatchingThread; - private DispatchingThread.ExceptionHandler dispatchingThread_ExceptionHandler; - private readonly SessionInfo info; - private int producerCounter; - internal bool startedAsyncDelivery = false; - private bool disposed = false; - private bool closed = false; - private bool closing = false; - private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000); - - public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode) - { - this.connection = connection; - this.info = info; - this.acknowledgementMode = acknowledgementMode; - this.AsyncSend = connection.AsyncSend; - this.requestTimeout = connection.RequestTimeout; - this.PrefetchSize = 1000; - this.transactionContext = new TransactionContext(this); - this.dispatchingThread = new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages)); - this.dispatchingThread_ExceptionHandler = new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener); - } - - ~Session() - { - Dispose(false); - } - - /// <summary> - /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers - /// until acknowledgements are received. - /// </summary> - public int PrefetchSize; - - /// <summary> - /// Sets the maximum number of messages to keep around per consumer - /// in addition to the prefetch window for non-durable topics until messages - /// will start to be evicted for slow consumers. - /// Must be > 0 to enable this feature - /// </summary> - public int MaximumPendingMessageLimit; - - /// <summary> - /// Enables or disables whether asynchronous dispatch should be used by the broker - /// </summary> - public bool DispatchAsync; - - /// <summary> - /// Enables or disables exclusive consumers when using queues. An exclusive consumer means - /// only one instance of a consumer is allowed to process messages on a queue to preserve order - /// </summary> - public bool Exclusive; - - /// <summary> - /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not? - /// </summary> - public bool Retroactive; - - /// <summary> - /// Sets the default consumer priority for consumers - /// </summary> - public byte Priority; - - /// <summary> - /// This property indicates whether or not async send is enabled. - /// </summary> - public bool AsyncSend; - - private Connection connection; - public Connection Connection - { - get { return this.connection; } - } - - public SessionId SessionId - { - get { return info.SessionId; } - } - - private TransactionContext transactionContext; - public TransactionContext TransactionContext - { - get { return this.transactionContext; } - } - - #region ISession Members - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - protected void Dispose(bool disposing) - { - if(this.disposed) - { - return; - } - - if(disposing) - { - // Dispose managed code here. - } - - try - { - Close(); - } - catch - { - // Ignore network errors. - } - - this.disposed = true; - } - - public void Close() - { - lock(myLock) - { - if(this.closed) - { - return; - } - - try - { - this.closing = true; - StopAsyncDelivery(); - lock(consumers.SyncRoot) - { - foreach(MessageConsumer consumer in consumers.Values) - { - consumer.Close(); - } - } - consumers.Clear(); - - lock(producers.SyncRoot) - { - foreach(MessageProducer producer in producers.Values) - { - producer.Close(); - } - } - producers.Clear(); + /// <summary> + /// Default provider of ISession + /// </summary> + public class Session : ISession + { + /// <summary> + /// Private object used for synchronization, instead of public "this" + /// </summary> + private readonly object myLock = new object(); + private int consumerCounter; + private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable()); + private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable()); + private readonly DispatchingThread dispatchingThread; + private DispatchingThread.ExceptionHandler dispatchingThread_ExceptionHandler; + private readonly SessionInfo info; + private int producerCounter; + internal bool startedAsyncDelivery = false; + private bool disposed = false; + private bool closed = false; + private bool closing = false; + private TimeSpan MAX_THREAD_WAIT = TimeSpan.FromMilliseconds(30000); + + public Session(Connection connection, SessionInfo info, AcknowledgementMode acknowledgementMode) + { + this.connection = connection; + this.info = info; + this.acknowledgementMode = acknowledgementMode; + this.AsyncSend = connection.AsyncSend; + this.requestTimeout = connection.RequestTimeout; + this.PrefetchSize = 1000; + this.transactionContext = new TransactionContext(this); + this.dispatchingThread = new DispatchingThread(new DispatchingThread.DispatchFunction(DispatchAsyncMessages)); + this.dispatchingThread_ExceptionHandler = new DispatchingThread.ExceptionHandler(dispatchingThread_ExceptionListener); + } + + ~Session() + { + Dispose(false); + } + + /// <summary> + /// Sets the prefetch size, the maximum number of messages a broker will dispatch to consumers + /// until acknowledgements are received. + /// </summary> + public int PrefetchSize; + + /// <summary> + /// Sets the maximum number of messages to keep around per consumer + /// in addition to the prefetch window for non-durable topics until messages + /// will start to be evicted for slow consumers. + /// Must be > 0 to enable this feature + /// </summary> + public int MaximumPendingMessageLimit; + + /// <summary> + /// Enables or disables whether asynchronous dispatch should be used by the broker + /// </summary> + public bool DispatchAsync; + + /// <summary> + /// Enables or disables exclusive consumers when using queues. An exclusive consumer means + /// only one instance of a consumer is allowed to process messages on a queue to preserve order + /// </summary> + public bool Exclusive; + + /// <summary> + /// Enables or disables retroactive mode for consumers; i.e. do they go back in time or not? + /// </summary> + public bool Retroactive; + + /// <summary> + /// Sets the default consumer priority for consumers + /// </summary> + public byte Priority; + + /// <summary> + /// This property indicates whether or not async send is enabled. + /// </summary> + public bool AsyncSend; + + private Connection connection; + public Connection Connection + { + get { return this.connection; } + } + + public SessionId SessionId + { + get { return info.SessionId; } + } + + private TransactionContext transactionContext; + public TransactionContext TransactionContext + { + get { return this.transactionContext; } + } + + #region ISession Members + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected void Dispose(bool disposing) + { + if(this.disposed) + { + return; + } + + if(disposing) + { + // Dispose managed code here. + } + + try + { + Close(); + } + catch + { + // Ignore network errors. + } + + this.disposed = true; + } + + public void Close() + { + lock(myLock) + { + if(this.closed) + { + return; + } + + try + { + this.closing = true; + StopAsyncDelivery(); + lock(consumers.SyncRoot) + { + foreach(MessageConsumer consumer in consumers.Values) + { + consumer.Close(); + } + } + consumers.Clear(); + + lock(producers.SyncRoot) + { + foreach(MessageProducer producer in producers.Values) + { + producer.Close(); + } + } + producers.Clear(); Connection.RemoveSession(this); - } - catch(Exception ex) - { - Tracer.ErrorFormat("Error during session close: {0}", ex); - } - finally - { - this.connection = null; - this.closed = true; - this.closing = false; - } - } - } - - public IMessageProducer CreateProducer() - { - return CreateProducer(null); - } - - public IMessageProducer CreateProducer(IDestination destination) - { - ProducerInfo command = CreateProducerInfo(destination); - ProducerId producerId = command.ProducerId; - MessageProducer producer = null; - - try - { - producer = new MessageProducer(this, command); - producers[producerId] = producer; - this.DoSend(command); - } - catch(Exception) - { - if(producer != null) - { - producer.Close(); - } - - throw; - } - - return producer; - } - - public IMessageConsumer CreateConsumer(IDestination destination) - { - return CreateConsumer(destination, null, false); - } - - public IMessageConsumer CreateConsumer(IDestination destination, string selector) - { - return CreateConsumer(destination, selector, false); - } + } + catch(Exception ex) + { + Tracer.ErrorFormat("Error during session close: {0}", ex); + } + finally + { + this.connection = null; + this.closed = true; + this.closing = false; + } + } + } + + public IMessageProducer CreateProducer() + { + return CreateProducer(null); + } + + public IMessageProducer CreateProducer(IDestination destination) + { + ProducerInfo command = CreateProducerInfo(destination); + ProducerId producerId = command.ProducerId; + MessageProducer producer = null; + + try + { + producer = new MessageProducer(this, command); + producers[producerId] = producer; + this.DoSend(command); + } + catch(Exception) + { + if(producer != null) + { + producer.Close(); + } + + throw; + } + + return producer; + } + + public IMessageConsumer CreateConsumer(IDestination destination) + { + return CreateConsumer(destination, null, false); + } + + public IMessageConsumer CreateConsumer(IDestination destination, string selector) + { + return CreateConsumer(destination, selector, false); + } - public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal) - { + public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal) + { if (destination == null) { throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); } - ConsumerInfo command = CreateConsumerInfo(destination, selector); - command.NoLocal = noLocal; - command.AcknowledgementMode = this.AcknowledgementMode; - - ConsumerId consumerId = command.ConsumerId; - MessageConsumer consumer = null; - - try - { - consumer = new MessageConsumer(this, command, this.AcknowledgementMode); - // lets register the consumer first in case we start dispatching messages immediately - consumers[consumerId] = consumer; - this.DoSend(command); - return consumer; - } - catch(Exception) - { - if(consumer != null) - { - consumer.Close(); - } - - throw; - } - } + ConsumerInfo command = CreateConsumerInfo(destination, selector); + command.NoLocal = noLocal; + ConsumerId consumerId = command.ConsumerId; + MessageConsumer consumer = null; + + try + { + consumer = new MessageConsumer(this, command, this.AcknowledgementMode); + // lets register the consumer first in case we start dispatching messages immediately + consumers[consumerId] = consumer; + this.DoSend(command); + return consumer; + } + catch(Exception) + { + if(consumer != null) + { + consumer.Close(); + } - public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) - { + throw; + } + } + + public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) + { if (destination == null) { throw new InvalidDestinationException("Cannot create a Consumer with a Null destination"); } - + ConsumerInfo command = CreateConsumerInfo(destination, selector); - ConsumerId consumerId = command.ConsumerId; - command.SubscriptionName = name; - command.NoLocal = noLocal; - MessageConsumer consumer = null; - - try - { - consumer = new MessageConsumer(this, command, this.AcknowledgementMode); - // lets register the consumer first in case we start dispatching messages immediately - consumers[consumerId] = consumer; - this.DoSend(command); - } - catch(Exception) - { - if(consumer != null) - { - consumer.Close(); - } - - throw; - } - - return consumer; - } - - public void DeleteDurableConsumer(string name) - { - RemoveSubscriptionInfo command = new RemoveSubscriptionInfo(); - command.ConnectionId = Connection.ConnectionId; - command.ClientId = Connection.ClientId; - command.SubcriptionName = name; - this.DoSend(command); - } - - public IQueue GetQueue(string name) - { - return new ActiveMQQueue(name); - } - - public ITopic GetTopic(string name) - { - return new ActiveMQTopic(name); - } - - public ITemporaryQueue CreateTemporaryQueue() - { - ActiveMQTempQueue answer = new ActiveMQTempQueue(Connection.CreateTemporaryDestinationName()); - CreateTemporaryDestination(answer); - return answer; - } - - public ITemporaryTopic CreateTemporaryTopic() - { - ActiveMQTempTopic answer = new ActiveMQTempTopic(Connection.CreateTemporaryDestinationName()); - CreateTemporaryDestination(answer); - return answer; - } - - /// <summary> - /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic). - /// </summary> - public void DeleteDestination(IDestination destination) - { - DestinationInfo command = new DestinationInfo(); - command.ConnectionId = Connection.ConnectionId; - command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove - command.Destination = destination; - - this.DoSend(command); - } - - public IMessage CreateMessage() - { - ActiveMQMessage answer = new ActiveMQMessage(); - Configure(answer); - return answer; - } - - public ITextMessage CreateTextMessage() - { - ActiveMQTextMessage answer = new ActiveMQTextMessage(); - Configure(answer); - return answer; - } - - public ITextMessage CreateTextMessage(string text) - { - ActiveMQTextMessage answer = new ActiveMQTextMessage(text); - Configure(answer); - return answer; - } - - public IMapMessage CreateMapMessage() - { - return new ActiveMQMapMessage(); - } - - public IBytesMessage CreateBytesMessage() - { - return new ActiveMQBytesMessage(); - } - - public IBytesMessage CreateBytesMessage(byte[] body) - { - ActiveMQBytesMessage answer = new ActiveMQBytesMessage(); - answer.Content = body; - return answer; - } - - public IObjectMessage CreateObjectMessage(object body) - { - ActiveMQObjectMessage answer = new ActiveMQObjectMessage(); - answer.Body = body; - return answer; - } - - public void Commit() - { - if(!Transacted) - { - throw new InvalidOperationException( - "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " - + this.AcknowledgementMode); - } - this.TransactionContext.Commit(); - } - - public void Rollback() - { - if(!Transacted) - { - throw new InvalidOperationException( - "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " - + this.AcknowledgementMode); - } - this.TransactionContext.Rollback(); - - // lets ensure all the consumers redeliver any rolled back messages - lock(consumers.SyncRoot) - { - foreach(MessageConsumer consumer in consumers.Values) - { - consumer.RedeliverRolledBackMessages(); - } - } - } - - - // Properties - - private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout; - public TimeSpan RequestTimeout - { - get { return this.requestTimeout; } - set { this.requestTimeout = value; } - } - - public bool Transacted - { - get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; } - } - - private AcknowledgementMode acknowledgementMode; - public AcknowledgementMode AcknowledgementMode - { - get { return this.acknowledgementMode; } - } - - #endregion - - private void dispatchingThread_ExceptionListener(Exception exception) - { - if(null != Connection) - { - try - { - Connection.OnSessionException(this, exception); - } - catch - { - } - } - } - - protected void CreateTemporaryDestination(ActiveMQDestination tempDestination) - { - DestinationInfo command = new DestinationInfo(); - command.ConnectionId = Connection.ConnectionId; - command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add - command.Destination = tempDestination; - - this.DoSend(command); - } - - public void DoSend(Command message) - { - this.DoSend(message, this.RequestTimeout); - } - - public void DoSend(Command message, TimeSpan requestTimeout) - { - if(AsyncSend) - { - Connection.OneWay(message); - } - else - { - Connection.SyncRequest(message, requestTimeout); - } - } - - /// <summary> - /// Ensures that a transaction is started - /// </summary> - public void DoStartTransaction() - { - if(Transacted) - { - this.TransactionContext.Begin(); - } - } - - public void DisposeOf(ConsumerId objectId) - { - Connection.DisposeOf(objectId); - if(!this.closing) - { - consumers.Remove(objectId); - } - } - - public void DisposeOf(ProducerId objectId) - { - Connection.DisposeOf(objectId); - if(!this.closing) - { - producers.Remove(objectId); - } - } - - public bool DispatchMessage(ConsumerId consumerId, Message message) - { - bool dispatched = false; - MessageConsumer consumer = (MessageConsumer) consumers[consumerId]; - - if(consumer != null) - { - consumer.Dispatch((ActiveMQMessage) message); - dispatched = true; - } - - return dispatched; - } - - /// <summary> - /// Private method called by the dispatcher thread in order to perform - /// asynchronous delivery of queued (inbound) messages. - /// </summary> - private void DispatchAsyncMessages() - { - // lets iterate through each consumer created by this session - // ensuring that they have all pending messages dispatched - lock(consumers.SyncRoot) - { - foreach(MessageConsumer consumer in consumers.Values) - { - consumer.DispatchAsyncMessages(); - } - } - } - - protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) - { - ConsumerInfo answer = new ConsumerInfo(); - ConsumerId id = new ConsumerId(); - id.ConnectionId = info.SessionId.ConnectionId; - id.SessionId = info.SessionId.Value; - id.Value = Interlocked.Increment(ref consumerCounter); - answer.ConsumerId = id; - answer.Destination = ActiveMQDestination.Transform(destination); - answer.Selector = selector; - answer.PrefetchSize = this.PrefetchSize; - answer.Priority = this.Priority; - answer.Exclusive = this.Exclusive; - answer.DispatchAsync = this.DispatchAsync; - answer.Retroactive = this.Retroactive; - - // If the destination contained a URI query, then use it to set public properties - // on the ConsumerInfo - ActiveMQDestination amqDestination = destination as ActiveMQDestination; - if(amqDestination != null && amqDestination.Options != null) - { - URISupport.SetProperties(answer, amqDestination.Options, "consumer."); - } - - return answer; - } - - protected virtual ProducerInfo CreateProducerInfo(IDestination destination) - { - ProducerInfo answer = new ProducerInfo(); - ProducerId id = new ProducerId(); - id.ConnectionId = info.SessionId.ConnectionId; - id.SessionId = info.SessionId.Value; - id.Value = Interlocked.Increment(ref producerCounter); - answer.ProducerId = id; - answer.Destination = ActiveMQDestination.Transform(destination); - - // If the destination contained a URI query, then use it to set public - // properties on the ProducerInfo - ActiveMQDestination amqDestination = destination as ActiveMQDestination; - if(amqDestination != null && amqDestination.Options != null) - { - URISupport.SetProperties(answer, amqDestination.Options, "producer."); - } - - return answer; - } - - /// <summary> - /// Configures the message command - /// </summary> - protected void Configure(ActiveMQMessage message) - { - } - - internal void StopAsyncDelivery() - { - if(startedAsyncDelivery) - { - this.dispatchingThread.ExceptionListener -= this.dispatchingThread_ExceptionHandler; - dispatchingThread.Stop((int) MAX_THREAD_WAIT.TotalMilliseconds); - startedAsyncDelivery = false; - } - } - - internal void StartAsyncDelivery() - { - if(!startedAsyncDelivery) - { - this.dispatchingThread.ExceptionListener += this.dispatchingThread_ExceptionHandler; - dispatchingThread.Start(); - startedAsyncDelivery = true; - } - } - - internal void RegisterConsumerDispatcher(Dispatcher dispatcher) - { - dispatcher.SetAsyncDelivery(this.dispatchingThread.EventHandle); - } - } + ConsumerId consumerId = command.ConsumerId; + command.SubscriptionName = name; + command.NoLocal = noLocal; + MessageConsumer consumer = null; + + try + { + consumer = new MessageConsumer(this, command, this.AcknowledgementMode); + // lets register the consumer first in case we start dispatching messages immediately + consumers[consumerId] = consumer; + this.DoSend(command); + } + catch(Exception) + { + if(consumer != null) + { + consumer.Close(); + } + + throw; + } + + return consumer; + } + + public void DeleteDurableConsumer(string name) + { + RemoveSubscriptionInfo command = new RemoveSubscriptionInfo(); + command.ConnectionId = Connection.ConnectionId; + command.ClientId = Connection.ClientId; + command.SubcriptionName = name; + this.DoSend(command); + } + + public IQueue GetQueue(string name) + { + return new ActiveMQQueue(name); + } + + public ITopic GetTopic(string name) + { + return new ActiveMQTopic(name); + } + + public ITemporaryQueue CreateTemporaryQueue() + { + ActiveMQTempQueue answer = new ActiveMQTempQueue(Connection.CreateTemporaryDestinationName()); + CreateTemporaryDestination(answer); + return answer; + } + + public ITemporaryTopic CreateTemporaryTopic() + { + ActiveMQTempTopic answer = new ActiveMQTempTopic(Connection.CreateTemporaryDestinationName()); + CreateTemporaryDestination(answer); + return answer; + } + + /// <summary> + /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic). + /// </summary> + public void DeleteDestination(IDestination destination) + { + DestinationInfo command = new DestinationInfo(); + command.ConnectionId = Connection.ConnectionId; + command.OperationType = DestinationInfo.REMOVE_OPERATION_TYPE; // 1 is remove + command.Destination = (ActiveMQDestination) destination; + + this.DoSend(command); + } + + public IMessage CreateMessage() + { + ActiveMQMessage answer = new ActiveMQMessage(); + Configure(answer); + return answer; + } + + public ITextMessage CreateTextMessage() + { + ActiveMQTextMessage answer = new ActiveMQTextMessage(); + Configure(answer); + return answer; + } + + public ITextMessage CreateTextMessage(string text) + { + ActiveMQTextMessage answer = new ActiveMQTextMessage(text); + Configure(answer); + return answer; + } + + public IMapMessage CreateMapMessage() + { + return new ActiveMQMapMessage(); + } + + public IBytesMessage CreateBytesMessage() + { + return new ActiveMQBytesMessage(); + } + + public IBytesMessage CreateBytesMessage(byte[] body) + { + ActiveMQBytesMessage answer = new ActiveMQBytesMessage(); + answer.Content = body; + return answer; + } + + public IObjectMessage CreateObjectMessage(object body) + { + ActiveMQObjectMessage answer = new ActiveMQObjectMessage(); + answer.Body = body; + return answer; + } + + public void Commit() + { + if(!Transacted) + { + throw new InvalidOperationException( + "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + + this.AcknowledgementMode); + } + this.TransactionContext.Commit(); + } + + public void Rollback() + { + if(!Transacted) + { + throw new InvalidOperationException( + "You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + + this.AcknowledgementMode); + } + this.TransactionContext.Rollback(); + + // lets ensure all the consumers redeliver any rolled back messages + lock(consumers.SyncRoot) + { + foreach(MessageConsumer consumer in consumers.Values) + { + consumer.RedeliverRolledBackMessages(); + } + } + } + + + // Properties + + private TimeSpan requestTimeout = Apache.NMS.NMSConstants.defaultRequestTimeout; + public TimeSpan RequestTimeout + { + get { return this.requestTimeout; } + set { this.requestTimeout = value; } + } + + public bool Transacted + { + get { return this.AcknowledgementMode == AcknowledgementMode.Transactional; } + } + + private AcknowledgementMode acknowledgementMode; + public AcknowledgementMode AcknowledgementMode + { + get { return this.acknowledgementMode; } + } + + #endregion + + private void dispatchingThread_ExceptionListener(Exception exception) + { + if(null != Connection) + { + try + { + Connection.OnSessionException(this, exception); + } + catch + { + } + } + } + + protected void CreateTemporaryDestination(ActiveMQDestination tempDestination) + { + DestinationInfo command = new DestinationInfo(); + command.ConnectionId = Connection.ConnectionId; + command.OperationType = DestinationInfo.ADD_OPERATION_TYPE; // 0 is add + command.Destination = tempDestination; + + this.DoSend(command); + } + + public void DoSend(Command message) + { + this.DoSend(message, this.RequestTimeout); + } + + public void DoSend(Command message, TimeSpan requestTimeout) + { + if(AsyncSend) + { + Connection.OneWay(message); + } + else + { + Connection.SyncRequest(message, requestTimeout); + } + } + + /// <summary> + /// Ensures that a transaction is started + /// </summary> + public void DoStartTransaction() + { + if(Transacted) + { + this.TransactionContext.Begin(); + } + } + + public void DisposeOf(ConsumerId objectId) + { + Connection.DisposeOf(objectId); + if(!this.closing) + { + consumers.Remove(objectId); + } + } + + public void DisposeOf(ProducerId objectId) + { + Connection.DisposeOf(objectId); + if(!this.closing) + { + producers.Remove(objectId); + } + } + + public bool DispatchMessage(ConsumerId consumerId, Message message) + { + bool dispatched = false; + MessageConsumer consumer = (MessageConsumer) consumers[consumerId]; + + if(consumer != null) + { + consumer.Dispatch((ActiveMQMessage) message); + dispatched = true; + } + + return dispatched; + } + + /// <summary> + /// Private method called by the dispatcher thread in order to perform + /// asynchronous delivery of queued (inbound) messages. + /// </summary> + private void DispatchAsyncMessages() + { + // lets iterate through each consumer created by this session + // ensuring that they have all pending messages dispatched + lock(consumers.SyncRoot) + { + foreach(MessageConsumer consumer in consumers.Values) + { + consumer.DispatchAsyncMessages(); + } + } + } + + protected virtual ConsumerInfo CreateConsumerInfo(IDestination destination, string selector) + { + ConsumerInfo answer = new ConsumerInfo(); + ConsumerId id = new ConsumerId(); + id.ConnectionId = info.SessionId.ConnectionId; + id.SessionId = info.SessionId.Value; + id.Value = Interlocked.Increment(ref consumerCounter); + answer.ConsumerId = id; + answer.Destination = ActiveMQDestination.Transform(destination); + answer.Selector = selector; + answer.PrefetchSize = this.PrefetchSize; + answer.Priority = this.Priority; + answer.Exclusive = this.Exclusive; + answer.DispatchAsync = this.DispatchAsync; + answer.Retroactive = this.Retroactive; + + // If the destination contained a URI query, then use it to set public properties + // on the ConsumerInfo + ActiveMQDestination amqDestination = destination as ActiveMQDestination; + if(amqDestination != null && amqDestination.Options != null) + { + URISupport.SetProperties(answer, amqDestination.Options, "consumer."); + } + + return answer; + } + + protected virtual ProducerInfo CreateProducerInfo(IDestination destination) + { + ProducerInfo answer = new ProducerInfo(); + ProducerId id = new ProducerId(); + id.ConnectionId = info.SessionId.ConnectionId; + id.SessionId = info.SessionId.Value; + id.Value = Interlocked.Increment(ref producerCounter); + answer.ProducerId = id; + answer.Destination = ActiveMQDestination.Transform(destination); + + // If the destination contained a URI query, then use it to set public + // properties on the ProducerInfo + ActiveMQDestination amqDestination = destination as ActiveMQDestination; + if(amqDestination != null && amqDestination.Options != null) + { + URISupport.SetProperties(answer, amqDestination.Options, "producer."); + } + + return answer; + } + + /// <summary> + /// Configures the message command + /// </summary> + protected void Configure(ActiveMQMessage message) + { + } + + internal void StopAsyncDelivery() + { + if(startedAsyncDelivery) + { + this.dispatchingThread.ExceptionListener -= this.dispatchingThread_ExceptionHandler; + dispatchingThread.Stop((int) MAX_THREAD_WAIT.TotalMilliseconds); + startedAsyncDelivery = false; + } + } + + internal void StartAsyncDelivery() + { + if(!startedAsyncDelivery) + { + this.dispatchingThread.ExceptionListener += this.dispatchingThread_ExceptionHandler; + dispatchingThread.Start(); + startedAsyncDelivery = true; + } + } + + internal void RegisterConsumerDispatcher(Dispatcher dispatcher) + { + dispatcher.SetAsyncDelivery(this.dispatchingThread.EventHandle); + } + } }
Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs?rev=799407&r1=799406&r2=799407&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/CommandVisitorAdapter.cs Thu Jul 30 19:06:34 2009 @@ -19,178 +19,188 @@ namespace Apache.NMS.ActiveMQ.State { - public class CommandVisitorAdapter : ICommandVisitor - { + public class CommandVisitorAdapter : ICommandVisitor + { - public virtual Response processAddConnection(ConnectionInfo info) - { - return null; - } - - public virtual Response processAddConsumer(ConsumerInfo info) - { - return null; - } - - public virtual Response processAddDestination(DestinationInfo info) - { - return null; - } - - public virtual Response processAddProducer(ProducerInfo info) - { - return null; - } - - public virtual Response processAddSession(SessionInfo info) - { - return null; - } - - public virtual Response processBeginTransaction(TransactionInfo info) - { - return null; - } - - public virtual Response processBrokerInfo(BrokerInfo info) - { - return null; - } - - public virtual Response processCommitTransactionOnePhase(TransactionInfo info) - { - return null; - } - - public virtual Response processCommitTransactionTwoPhase(TransactionInfo info) - { - return null; - } - - public virtual Response processEndTransaction(TransactionInfo info) - { - return null; - } - - public virtual Response processFlush(FlushCommand command) - { - return null; - } - - public virtual Response processForgetTransaction(TransactionInfo info) - { - return null; - } - - public virtual Response processKeepAlive(KeepAliveInfo info) - { - return null; - } - - public virtual Response processMessage(Message send) - { - return null; - } - - public virtual Response processMessageAck(MessageAck ack) - { - return null; - } - - public virtual Response processMessageDispatchNotification(MessageDispatchNotification notification) - { - return null; - } - - public virtual Response processMessagePull(MessagePull pull) - { - return null; - } - - public virtual Response processPrepareTransaction(TransactionInfo info) - { - return null; - } - - public virtual Response processProducerAck(ProducerAck ack) - { - return null; - } - - public virtual Response processRecoverTransactions(TransactionInfo info) - { - return null; - } - - public virtual Response processRemoveConnection(ConnectionId id) - { - return null; - } - - public virtual Response processRemoveConsumer(ConsumerId id) - { - return null; - } - - public virtual Response processRemoveDestination(DestinationInfo info) - { - return null; - } - - public virtual Response processRemoveProducer(ProducerId id) - { - return null; - } - - public virtual Response processRemoveSession(SessionId id) - { - return null; - } - - public virtual Response processRemoveSubscription(RemoveSubscriptionInfo info) - { - return null; - } - - public virtual Response processRollbackTransaction(TransactionInfo info) - { - return null; - } - - public virtual Response processShutdown(ShutdownInfo info) - { - return null; - } - - public virtual Response processWireFormat(WireFormatInfo info) - { - return null; - } - - public virtual Response processMessageDispatch(MessageDispatch dispatch) - { - return null; - } - - public virtual Response processControlCommand(ControlCommand command) - { - return null; - } - - public virtual Response processConnectionControl(ConnectionControl control) - { - return null; - } - - public virtual Response processConnectionError(ConnectionError error) - { - return null; - } - - public virtual Response processConsumerControl(ConsumerControl control) - { - return null; - } + public virtual Response processAddConnection(ConnectionInfo info) + { + return null; + } + + public virtual Response processAddConsumer(ConsumerInfo info) + { + return null; + } + + public virtual Response processAddDestination(DestinationInfo info) + { + return null; + } + + public virtual Response processAddProducer(ProducerInfo info) + { + return null; + } + + public virtual Response processAddSession(SessionInfo info) + { + return null; + } + + public virtual Response processBeginTransaction(TransactionInfo info) + { + return null; + } + + public virtual Response processBrokerInfo(BrokerInfo info) + { + return null; + } + + public virtual Response processCommitTransactionOnePhase(TransactionInfo info) + { + return null; + } + + public virtual Response processCommitTransactionTwoPhase(TransactionInfo info) + { + return null; + } + + public virtual Response processEndTransaction(TransactionInfo info) + { + return null; + } + + public virtual Response processFlushCommand(FlushCommand command) + { + return null; + } + + public virtual Response processForgetTransaction(TransactionInfo info) + { + return null; + } + + public virtual Response processKeepAliveInfo(KeepAliveInfo info) + { + return null; + } + + public virtual Response processMessage(Message send) + { + return null; + } + + public virtual Response processMessageAck(MessageAck ack) + { + return null; + } + + public virtual Response processMessageDispatchNotification(MessageDispatchNotification notification) + { + return null; + } + + public virtual Response processMessagePull(MessagePull pull) + { + return null; + } + + public virtual Response processPrepareTransaction(TransactionInfo info) + { + return null; + } + + public virtual Response processProducerAck(ProducerAck ack) + { + return null; + } + + public virtual Response processRecoverTransactions(TransactionInfo info) + { + return null; + } + + public virtual Response processRemoveConnection(ConnectionId id) + { + return null; + } + + public virtual Response processRemoveConsumer(ConsumerId id) + { + return null; + } + + public virtual Response processRemoveDestination(DestinationInfo info) + { + return null; + } + + public virtual Response processRemoveProducer(ProducerId id) + { + return null; + } + + public virtual Response processRemoveSession(SessionId id) + { + return null; + } + + public virtual Response processRemoveSubscriptionInfo(RemoveSubscriptionInfo info) + { + return null; + } + + public virtual Response processRollbackTransaction(TransactionInfo info) + { + return null; + } + + public virtual Response processShutdownInfo(ShutdownInfo info) + { + return null; + } + + public virtual Response processWireFormat(WireFormatInfo info) + { + return null; + } + + public virtual Response processMessageDispatch(MessageDispatch dispatch) + { + return null; + } + + public virtual Response processControlCommand(ControlCommand command) + { + return null; + } + + public virtual Response processConnectionControl(ConnectionControl control) + { + return null; + } + + public virtual Response processConnectionError(ConnectionError error) + { + return null; + } + + public virtual Response processConsumerControl(ConsumerControl control) + { + return null; + } + + public virtual Response processResponse(Response response) + { + return null; + } + + public virtual Response processReplayCommand(ReplayCommand replayCommand) + { + return null; + } - } + } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs?rev=799407&r1=799406&r2=799407&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/State/ICommandVisitor.cs Thu Jul 30 19:06:34 2009 @@ -20,76 +20,80 @@ namespace Apache.NMS.ActiveMQ.State { - public interface ICommandVisitor - { + public interface ICommandVisitor + { - Response processAddConnection(ConnectionInfo info); + Response processAddConnection(ConnectionInfo info); - Response processAddSession(SessionInfo info); + Response processAddSession(SessionInfo info); - Response processAddProducer(ProducerInfo info); + Response processAddProducer(ProducerInfo info); - Response processAddConsumer(ConsumerInfo info); + Response processAddConsumer(ConsumerInfo info); - Response processRemoveConnection(ConnectionId id); + Response processRemoveConnection(ConnectionId id); - Response processRemoveSession(SessionId id); + Response processRemoveSession(SessionId id); - Response processRemoveProducer(ProducerId id); + Response processRemoveProducer(ProducerId id); - Response processRemoveConsumer(ConsumerId id); + Response processRemoveConsumer(ConsumerId id); - Response processAddDestination(DestinationInfo info); + Response processAddDestination(DestinationInfo info); - Response processRemoveDestination(DestinationInfo info); + Response processRemoveDestination(DestinationInfo info); - Response processRemoveSubscription(RemoveSubscriptionInfo info); + Response processRemoveSubscriptionInfo(RemoveSubscriptionInfo info); - Response processMessage(Message send); + Response processMessage(Message send); - Response processMessageAck(MessageAck ack); + Response processMessageAck(MessageAck ack); - Response processMessagePull(MessagePull pull); + Response processMessagePull(MessagePull pull); - Response processBeginTransaction(TransactionInfo info); + Response processBeginTransaction(TransactionInfo info); - Response processPrepareTransaction(TransactionInfo info); + Response processPrepareTransaction(TransactionInfo info); - Response processCommitTransactionOnePhase(TransactionInfo info); + Response processCommitTransactionOnePhase(TransactionInfo info); - Response processCommitTransactionTwoPhase(TransactionInfo info); + Response processCommitTransactionTwoPhase(TransactionInfo info); - Response processRollbackTransaction(TransactionInfo info); + Response processRollbackTransaction(TransactionInfo info); - Response processWireFormat(WireFormatInfo info); + Response processWireFormat(WireFormatInfo info); - Response processKeepAlive(KeepAliveInfo info); + Response processKeepAliveInfo(KeepAliveInfo info); - Response processShutdown(ShutdownInfo info); + Response processShutdownInfo(ShutdownInfo info); - Response processFlush(FlushCommand command); + Response processFlushCommand(FlushCommand command); - Response processBrokerInfo(BrokerInfo info); + Response processBrokerInfo(BrokerInfo info); - Response processRecoverTransactions(TransactionInfo info); + Response processRecoverTransactions(TransactionInfo info); - Response processForgetTransaction(TransactionInfo info); + Response processForgetTransaction(TransactionInfo info); - Response processEndTransaction(TransactionInfo info); + Response processEndTransaction(TransactionInfo info); - Response processMessageDispatchNotification(MessageDispatchNotification notification); + Response processMessageDispatchNotification(MessageDispatchNotification notification); - Response processProducerAck(ProducerAck ack); + Response processProducerAck(ProducerAck ack); - Response processMessageDispatch(MessageDispatch dispatch); + Response processMessageDispatch(MessageDispatch dispatch); - Response processControlCommand(ControlCommand command); + Response processControlCommand(ControlCommand command); - Response processConnectionError(ConnectionError error); + Response processConnectionError(ConnectionError error); - Response processConnectionControl(ConnectionControl control); + Response processConnectionControl(ConnectionControl control); - Response processConsumerControl(ConsumerControl control); + Response processConsumerControl(ConsumerControl control); - } + Response processResponse(Response response); + + Response processReplayCommand(ReplayCommand replayCommand); + + } } Modified: activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs?rev=799407&r1=799406&r2=799407&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/WireFormatNegotiator.cs Thu Jul 30 19:06:34 2009 @@ -24,84 +24,84 @@ namespace Apache.NMS.ActiveMQ.Transport { - /// <summary> - /// A Transport which negotiates the wire format - /// </summary> - public class WireFormatNegotiator : TransportFilter - { - private OpenWireFormat wireFormat; - private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15); - - private AtomicBoolean firstStart=new AtomicBoolean(true); - private CountDownLatch readyCountDownLatch = new CountDownLatch(1); - private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1); - - public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat) - : base(next) - { - this.wireFormat = wireFormat; - } - - public override void Start() - { - base.Start(); - if (firstStart.CompareAndSet(true, false)) - { - try - { - next.Oneway(wireFormat.PreferedWireFormatInfo); - } - finally - { - wireInfoSentDownLatch.countDown(); - } - } - } - - protected override void Dispose(bool disposing) - { - base.Dispose(disposing); - readyCountDownLatch.countDown(); - } - - public override void Oneway(Command command) - { - if (!readyCountDownLatch.await(negotiateTimeout)) - throw new IOException("Wire format negotiation timeout: peer did not send his wire format."); - next.Oneway(command); - } - - protected override void OnCommand(ITransport sender, Command command) - { - if ( command.GetDataStructureType() == WireFormatInfo.ID_WireFormatInfo ) - { - WireFormatInfo info = (WireFormatInfo)command; - try - { - if (!info.Valid) - { - throw new IOException("Remote wire format magic is invalid"); - } - wireInfoSentDownLatch.await(negotiateTimeout); - wireFormat.renegotiateWireFormat(info); - } - catch (Exception e) - { - OnException(this, e); - } - finally - { - readyCountDownLatch.countDown(); - } - } - this.commandHandler(sender, command); - } - - protected override void OnException(ITransport sender, Exception command) - { - readyCountDownLatch.countDown(); - this.exceptionHandler(sender, command); - } - } + /// <summary> + /// A Transport which negotiates the wire format + /// </summary> + public class WireFormatNegotiator : TransportFilter + { + private OpenWireFormat wireFormat; + private TimeSpan negotiateTimeout = TimeSpan.FromSeconds(15); + + private AtomicBoolean firstStart=new AtomicBoolean(true); + private CountDownLatch readyCountDownLatch = new CountDownLatch(1); + private CountDownLatch wireInfoSentDownLatch = new CountDownLatch(1); + + public WireFormatNegotiator(ITransport next, OpenWireFormat wireFormat) + : base(next) + { + this.wireFormat = wireFormat; + } + + public override void Start() + { + base.Start(); + if (firstStart.CompareAndSet(true, false)) + { + try + { + next.Oneway(wireFormat.PreferedWireFormatInfo); + } + finally + { + wireInfoSentDownLatch.countDown(); + } + } + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + readyCountDownLatch.countDown(); + } + + public override void Oneway(Command command) + { + if (!readyCountDownLatch.await(negotiateTimeout)) + throw new IOException("Wire format negotiation timeout: peer did not send his wire format."); + next.Oneway(command); + } + + protected override void OnCommand(ITransport sender, Command command) + { + if ( command.IsWireFormatInfo ) + { + WireFormatInfo info = (WireFormatInfo)command; + try + { + if (!info.Valid) + { + throw new IOException("Remote wire format magic is invalid"); + } + wireInfoSentDownLatch.await(negotiateTimeout); + wireFormat.renegotiateWireFormat(info); + } + catch (Exception e) + { + OnException(this, e); + } + finally + { + readyCountDownLatch.countDown(); + } + } + this.commandHandler(sender, command); + } + + protected override void OnException(ITransport sender, Exception command) + { + readyCountDownLatch.countDown(); + this.exceptionHandler(sender, command); + } + } }
