Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs?rev=726083&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs Fri Dec 12 10:25:52 2008 @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.ServiceModel; +using System.ServiceModel.Channels; + +namespace Apache.NMS.WCF +{ + /// <summary> + /// Base class for NMS input channels. + /// </summary> + /// <typeparam name="T"></typeparam> + public abstract class NmsInputQueueChannelBase<T> : ChannelBase where T : class + { + #region Constructors + + /// <summary> + /// Initializes a new instance of the <see cref="NmsInputQueueChannelBase<T>"/> class. + /// </summary> + /// <param name="factory">The factory that was used to create the channel.</param> + /// <param name="localAddress">The local address of the channel.</param> + public NmsInputQueueChannelBase(ChannelListenerBase factory, EndpointAddress localAddress) + : base(factory) + { + _localAddress = localAddress; + _messageQueue = new InputQueue<T>(); + } + + #endregion + + #region Public properties + + /// <summary> + /// Gets the local address. + /// </summary> + /// <value>The local address.</value> + public EndpointAddress LocalAddress + { + get { return _localAddress; } + } + + #endregion + + #region Messaging + + /// <summary> + /// Gets the pending message count. + /// </summary> + /// <value>The pending message count.</value> + public int PendingMessageCount + { + get + { + return _messageQueue.PendingCount; + } + } + + /// <summary> + /// Dispatches the specified request. + /// </summary> + /// <param name="request">The request.</param> + public void Dispatch(T request) + { + ThrowIfDisposedOrNotOpen(); + _messageQueue.EnqueueAndDispatch(request); + } + + /// <summary> + /// Begins the dequeue operation. + /// </summary> + /// <param name="timeout">The timeout.</param> + /// <param name="callback">The callback.</param> + /// <param name="state">The state.</param> + public IAsyncResult BeginDequeue(TimeSpan timeout, AsyncCallback callback, object state) + { + return (State == CommunicationState.Opened) + ? _messageQueue.BeginDequeue(timeout, callback, state) + : new CompletedAsyncResult(callback, state); + } + + /// <summary> + /// Ends the dequeue operation. + /// </summary> + /// <param name="result">The result.</param> + /// <returns></returns> + public T EndDequeue(IAsyncResult result) + { + ThrowIfDisposedOrNotOpen(); + return _messageQueue.EndDequeue(result); + } + + /// <summary> + /// Dequeues the next message. + /// </summary> + /// <param name="timeout">The timeout.</param> + public T Dequeue(TimeSpan timeout) + { + ThrowIfDisposedOrNotOpen(); + return _messageQueue.Dequeue(timeout); + } + + /// <summary> + /// Tries to dequeue the next message. + /// </summary> + /// <param name="result">The result.</param> + /// <param name="message">The message.</param> + /// <returns></returns> + public bool TryDequeue(IAsyncResult result, out T message) + { + message = null; + TypedAsyncResult<T> completedResult = result as TypedAsyncResult<T>; + if(completedResult != null) + { + message = TypedAsyncResult<T>.End(result); + } + else if(result.CompletedSynchronously == false) + { + InputQueue<T>.AsyncQueueReader completedResult2 = result as InputQueue<T>.AsyncQueueReader; + InputQueue<T>.AsyncQueueReader.End(result, out message); + } + return result.IsCompleted; + } + + #endregion + + #region Abort + + /// <summary> + /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous abort operation. + /// </summary> + protected override void OnAbort() + { + _messageQueue.Close(); + } + + #endregion + + #region Open + + /// <summary> + /// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation. + /// </summary> + /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on open operation has to complete before timing out.</param> + /// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives notification of the completion of the asynchronous on open operation.</param> + /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param> + /// <returns> + /// The <see cref="T:System.IAsyncResult"/> that references the asynchronous on open operation. + /// </returns> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// <paramref name="timeout"/> is less than zero.</exception> + protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) + { + OnOpen(timeout); + return new CompletedAsyncResult(callback, state); + } + + /// <summary> + /// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time. + /// </summary> + /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on open operation has to complete before timing out.</param> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// <paramref name="timeout"/> is less than zero.</exception> + /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception> + protected override void OnOpen(TimeSpan timeout) + { + _messageQueue.Open(); + } + + /// <summary> + /// Completes an asynchronous operation on the open of a communication object. + /// </summary> + /// <param name="result">The <see cref="T:System.IAsyncResult"/> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)"/> method.</param> + /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception> + protected override void OnEndOpen(IAsyncResult result) + { + CompletedAsyncResult.End(result); + } + + #endregion + + #region Close + + /// <summary> + /// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation. + /// </summary> + /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on close operation has to complete before timing out.</param> + /// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives notification of the completion of the asynchronous on close operation.</param> + /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param> + /// <returns> + /// The <see cref="T:System.IAsyncResult"/> that references the asynchronous on close operation. + /// </returns> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// <paramref name="timeout"/> is less than zero.</exception> + protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) + { + OnClose(timeout); + return new CompletedAsyncResult(callback, state); + } + + /// <summary> + /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation. + /// </summary> + /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on close operation has to complete before timing out.</param> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// <paramref name="timeout"/> is less than zero.</exception> + protected override void OnClose(TimeSpan timeout) + { + _messageQueue.Close(); + } + + /// <summary> + /// Completes an asynchronous operation on the close of a communication object. + /// </summary> + /// <param name="result">The <see cref="T:System.IAsyncResult"/> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)"/> method.</param> + /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception> + protected override void OnEndClose(IAsyncResult result) + { + CompletedAsyncResult.End(result); + } + + #endregion + + #region GetProperty + + /// <summary> + /// Gets the property. + /// </summary> + /// <typeparam name="P"></typeparam> + public override P GetProperty<P>() + { + if(typeof(P) == typeof(FaultConverter)) + { + return FaultConverter.GetDefaultFaultConverter(MessageVersion.Soap12WSAddressing10) as P; + } + return base.GetProperty<P>(); + } + + #endregion + + #region Private members + + private EndpointAddress _localAddress; + private InputQueue<T> _messageQueue; + + #endregion + } +}
Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs?rev=726083&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs Fri Dec 12 10:25:52 2008 @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.ServiceModel; +using System.ServiceModel.Channels; + +namespace Apache.NMS.WCF +{ + /// <summary> + /// Server-side implementation of the sessioned one-way channel. + /// </summary> + public class NmsInputSessionChannel : NmsInputChannel, IInputSessionChannel + { + #region Constructors + + /// <summary> + /// Initializes a new instance of the <see cref="NmsInputSessionChannel"/> class. + /// </summary> + /// <param name="factory">The factory that was used to create the channel.</param> + /// <param name="localAddress">The local address of the channel.</param> + internal NmsInputSessionChannel(ChannelListenerBase factory, EndpointAddress localAddress) + : base(factory, localAddress) + { + } + + #endregion + + #region ISessionChannel<IInputSession> Members + + /// <summary> + /// Gets the type of session associated with this channel. + /// </summary> + /// <value></value> + /// <returns>The type of <see cref="T:System.ServiceModel.Channels.ISession"/> associated with this channel. </returns> + public IInputSession Session + { + get { return _session; } + } + + /// <summary> + /// Internal implementation of a session, with tracking ID. + /// </summary> + private class InputSession : IInputSession, System.ServiceModel.Channels.ISession + { + private string _sessionId = NmsChannelHelper.CreateUniqueSessionId(); + + /// <summary> + /// Gets the ID that uniquely identifies the session. + /// </summary> + /// <value></value> + /// <returns>The ID that uniquely identifies the session. </returns> + public string Id + { + get { return _sessionId; } + } + } + + #endregion + + #region Private members + + private IInputSession _session = new InputSession(); + + #endregion + } +} Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs?rev=726083&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs Fri Dec 12 10:25:52 2008 @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.ServiceModel; +using System.ServiceModel.Channels; +using System.Text; +using Apache.NMS.Util; + +namespace Apache.NMS.WCF +{ + /// <summary> + /// Server-side listener for sessioned input channels. + /// </summary> + public class NmsInputSessionChannelListener : ChannelListenerBase<IInputSessionChannel>, IChannel + { + #region Constructors + + /// <summary> + /// Initializes a new instance of the <see cref="NmsInputSessionChannelListener"/> class. + /// </summary> + /// <param name="transportElement">The binding element.</param> + /// <param name="context">The context.</param> + internal NmsInputSessionChannelListener(NmsTransportBindingElement transportElement, BindingContext context) + : base(context.Binding) + { + _bufferManager = BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, (int) transportElement.MaxReceivedMessageSize); + + MessageEncodingBindingElement messageEncoderBindingElement = context.BindingParameters.Remove<MessageEncodingBindingElement>(); + _messageEncoderFactory = (messageEncoderBindingElement != null) + ? messageEncoderBindingElement.CreateMessageEncoderFactory() + : NmsConstants.DefaultMessageEncoderFactory; + + _channelQueue = new InputQueue<IInputSessionChannel>(); + _currentChannelLock = new object(); + _destinationName = transportElement.Destination; + _destinationType = transportElement.DestinationType; + _uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress); + Tracer.DebugFormat("Listening to {0} at {1}/{2}", _destinationType, _uri, _destinationName); + } + + #endregion + + #region Public properties + + /// <summary> + /// Gets the message encoder factory. + /// </summary> + /// <value>The message encoder factory.</value> + public MessageEncoderFactory MessageEncoderFactory + { + get { return _messageEncoderFactory; } + } + + /// <summary> + /// Gets or sets the destination. + /// </summary> + /// <value>The destination.</value> + public string Destination + { + get { return _destinationName; } + set { _destinationName = value; } + } + + /// <summary> + /// Gets or sets the type of the destination. + /// </summary> + /// <value>The type of the destination.</value> + public DestinationType DestinationType + { + get { return _destinationType; } + set { _destinationType = value; } + } + + #endregion + + #region Implementation of CommunicationObject + + /// <summary> + /// Inserts processing on a communication object after it transitions to the closing state + /// due to the invocation of a synchronous abort operation. + /// </summary> + /// <remarks> + /// Abort can be called at any time, so we can't assume that we've been Opened successfully + /// (and thus may not have any listen sockets). + /// </remarks> + protected override void OnAbort() + { + OnClose(TimeSpan.Zero); + } + + /// <summary> + /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation. + /// </summary> + /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param> + /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception> + protected override void OnClose(TimeSpan timeout) + { + lock(ThisLock) + { + if(_consumer != null) + { + Tracer.Debug("Listener is terminating consumer..."); + _consumer.Close(); + _consumer.Dispose(); + Tracer.Debug("Listener has terminated consumer"); + } + + if(_session != null) + { + Tracer.Debug("Listener is terminating session..."); + _session.Close(); + Tracer.Debug("Listener has terminated session"); + } + + if(_connection != null) + { + Tracer.Debug("Listener is terminating connection..."); + _connection.Stop(); + _connection.Close(); + _connection.Dispose(); + Tracer.Debug("Listener has terminated connection"); + } + + _channelQueue.Close(); + } + } + + /// <summary> + /// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation. + /// </summary> + /// <returns> + /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on close operation. + /// </returns> + /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on close operation has to complete before timing out.</param> + /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on close operation.</param> + /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param> + /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception> + protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) + { + OnClose(timeout); + return new CompletedAsyncResult(callback, state); + } + + /// <summary> + /// Completes an asynchronous operation on the close of a communication object. + /// </summary> + /// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)" /> method.</param> + protected override void OnEndClose(IAsyncResult result) + { + CompletedAsyncResult.End(result); + } + + /// <summary> + /// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time. + /// </summary> + /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param> + /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception> + /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout" /> that was allotted for the operation was exceeded before the operation was completed.</exception> + protected override void OnOpen(TimeSpan timeout) + { + if(Uri == null) + { + throw new InvalidOperationException("Uri must be set before ChannelListener is opened."); + } + NmsChannelHelper.ValidateTimeout(timeout); + } + + /// <summary> + /// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation. + /// </summary> + /// <returns> + /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on open operation. + /// </returns> + /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on open operation has to complete before timing out.</param> + /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives notification of the completion of the asynchronous on open operation.</param> + /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param> + /// <exception cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less than zero.</exception> + protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) + { + NmsChannelHelper.ValidateTimeout(timeout); + OnOpen(timeout); + return new CompletedAsyncResult(callback, state); + } + + /// <summary> + /// Completes an asynchronous operation on the open of a communication object. + /// </summary> + /// <param name="result">The <see cref="T:System.IAsyncResult" /> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)" /> method.</param> + protected override void OnEndOpen(IAsyncResult result) + { + CompletedAsyncResult.End(result); + } + + #endregion + + #region Implementation of ChannelListenerBase + + /// <summary> + /// When implemented in derived class, gets the URI on which the channel listener listens for an incoming channel. + /// </summary> + /// <returns> + /// The <see cref="T:System.Uri" /> on which the channel listener listens for incoming channels. + /// </returns> + public override Uri Uri + { + get { return _uri; } + } + + /// <summary> + /// When overridden in a derived class, provides a point of extensibility when waiting for a channel to arrive. + /// </summary> + /// <returns> + /// true if the method completed before the interval of time specified by the <paramref name="timeout" /> expired; otherwise false. + /// </returns> + /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on wait for a channel operation has to complete before timing out.</param> + protected override bool OnWaitForChannel(TimeSpan timeout) + { + NmsChannelHelper.ValidateTimeout(timeout); + return _channelQueue.WaitForItem(timeout); + } + + /// <summary> + /// When implemented in a derived class, provides a point of extensibility when starting to wait for a channel to arrive. + /// </summary> + /// <returns> + /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous on begin wait operation. + /// </returns> + /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the on begin wait operation has to complete before timing out.</param> + /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous operation on begin wait completion.</param> + /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on begin wait operation.</param> + protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state) + { + NmsChannelHelper.ValidateTimeout(timeout); + return _channelQueue.BeginWaitForItem(timeout, callback, state); + } + + /// <summary> + /// When implemented in a derived class, provides a point of extensibility when ending the waiting for a channel to arrive. + /// </summary> + /// <returns> + /// true if the method completed before the timeout expired; otherwise false. + /// </returns> + /// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.ChannelListenerBase.OnBeginWaitForChannel(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param> + protected override bool OnEndWaitForChannel(IAsyncResult result) + { + return _channelQueue.EndWaitForItem(result); + } + + /// <summary> + /// When implemented in a derived class, provides an extensibility point when accepting a channel. + /// </summary> + /// <returns> + /// The <see cref="T:System.ServiceModel.Channels.IChannel" /> accepted. + /// </returns> + /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the accept channel operation has to complete before timing out.</param> + protected override IInputSessionChannel OnAcceptChannel(TimeSpan timeout) + { + Tracer.Debug("Accepting channel"); + NmsChannelHelper.ValidateTimeout(timeout); + if(!IsDisposed) + { + EnsureChannelAvailable(); + } + + IInputSessionChannel channel; + if(_channelQueue.Dequeue(timeout, out channel)) + { + return channel; + } + throw new TimeoutException(String.Format("Accept on listener at address {0} timed out after {1}.", Uri.AbsoluteUri, timeout)); + } + + /// <summary> + /// When implemented in a derived class, provides an asynchronous extensibility point when beginning to accept a channel. + /// </summary> + /// <returns> + /// The <see cref="T:System.IAsyncResult" /> that references the asynchronous accept channel operation. + /// </returns> + /// <param name="timeout">The <see cref="T:System.TimeSpan" /> that specifies how long the accept channel operation has to complete before timing out.</param> + /// <param name="callback">The <see cref="T:System.AsyncCallback" /> delegate that receives the notification of the asynchronous completion of the accept channel operation.</param> + /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous accept channel operation.</param> + protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state) + { + NmsChannelHelper.ValidateTimeout(timeout); + if(!IsDisposed) + { + EnsureChannelAvailable(); + } + return _channelQueue.BeginDequeue(timeout, callback, state); + } + + /// <summary> + /// When implemented in a derived class, provides an asynchronous extensibility point when completing the acceptance a channel. + /// </summary> + /// <returns> + /// The <see cref="T:System.ServiceModel.Channels.IChannel" /> accepted by the listener. + /// </returns> + /// <param name="result">The <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.ServiceModel.Channels.ChannelListenerBase`1.OnBeginAcceptChannel(System.TimeSpan,System.AsyncCallback,System.Object)" /> method.</param> + protected override IInputSessionChannel OnEndAcceptChannel(IAsyncResult result) + { + IInputSessionChannel channel; + if(_channelQueue.EndDequeue(result, out channel)) + { + return channel; + } + throw new TimeoutException(); + } + + #endregion + + /// <summary> + /// Dispatches the callback. + /// </summary> + /// <param name="state">The state.</param> + internal void DispatchCallback(object state) + { + Dispatch((Message) state); + } + + /// <summary> + /// Matches an incoming message to its waiting listener, + /// using the FilterTable to dispatch the message to the correct + /// listener. If no listener is waiting for the message, it is silently + /// discarded. + /// </summary> + internal void Dispatch(Message message) + { + if(message == null) + { + return; + } + + try + { + NmsInputSessionChannel newChannel; + bool channelCreated = CreateOrRetrieveChannel(out newChannel); + + Tracer.Debug("Dispatching incoming message"); + newChannel.Dispatch(message); + + if(channelCreated) + { + //Hand the channel off to whomever is waiting for AcceptChannel() to complete + Tracer.Debug("Handing off channel"); + _channelQueue.EnqueueAndDispatch(newChannel); + } + } + catch(Exception e) + { + Tracer.ErrorFormat("Error dispatching Message: {0}", e.ToString()); + } + } + + /// <summary> + /// Creates or retrieves the channel. + /// </summary> + /// <param name="newChannel">The channel.</param> + private bool CreateOrRetrieveChannel(out NmsInputSessionChannel newChannel) + { + bool channelCreated = false; + + if((newChannel = _currentChannel) == null) + { + lock(_currentChannelLock) + { + if((newChannel = _currentChannel) == null) + { + newChannel = CreateNmsChannel(Uri); + newChannel.Closed += OnChannelClosed; + _currentChannel = newChannel; + channelCreated = true; + } + } + } + + return channelCreated; + } + + /// <summary> + /// Called when the channel is closed. + /// </summary> + /// <param name="sender">The sender.</param> + /// <param name="args">The <see cref="System.EventArgs"/> instance containing the event data.</param> + private void OnChannelClosed(object sender, EventArgs args) + { + NmsInputSessionChannel channel = (NmsInputSessionChannel) sender; + + lock(_currentChannelLock) + { + if(channel == _currentChannel) + { + _currentChannel = null; + } + } + } + + /// <summary> + /// Creates the <see cref="NmsInputChannel" /> that will wait for inbound messages. + /// </summary> + /// <param name="uri">The URI for the message queue.</param> + private NmsInputSessionChannel CreateNmsChannel(Uri uri) + { + _connection = OpenConnection(uri); + _session = OpenSession(_connection); + _destination = SessionUtil.GetDestination(_session, Destination, DestinationType); + _consumer = CreateConsumer(_session, _destination); + + EndpointAddress address = new EndpointAddress(uri); + return new NmsInputSessionChannel(this, address); + } + + /// <summary> + /// Opens the connection to the message broker. + /// </summary> + /// <param name="uri">The URI.</param> + /// <returns>An active connection to the ActiveMQ message broker specified by the URI; + /// exceptions will be caught by the attached ExceptionListener.</returns> + private IConnection OpenConnection(Uri uri) + { + IConnection connection = ConnectionFactoryManager.GetInstance().CreateConnection(uri); + connection.ExceptionListener += OnExceptionThrown; + connection.Start(); + Tracer.Debug("Connection open"); + return connection; + } + + /// <summary> + /// Opens a session to communicate with a message queue. + /// </summary> + /// <param name="connection">The connection to the ActiveMQ message broker.</param> + /// <returns>A session.</returns> + /// <exception cref="InvalidOperationException">the <paramref name="connection" /> has not yet + /// been started.</exception> + private ISession OpenSession(IConnection connection) + { + if(!connection.IsStarted) + { + throw new InvalidOperationException("The connection has not yet been opened"); + } + + Tracer.Debug("Opening session..."); + ISession session = connection.CreateSession(); + Tracer.Debug("Session open"); + return session; + } + + /// <summary> + /// Creates the consumer of messages received on the <paramref name="session"/>. + /// </summary> + /// <param name="session">The session.</param> + /// <param name="destination">The destination.</param> + /// <returns>A consumer for any messages received during the session; + /// messages will be consumed by the attached Listener.</returns> + private IMessageConsumer CreateConsumer(ISession session, IDestination destination) + { + Tracer.Debug("Creating message listener..."); + IMessageConsumer consumer = session.CreateConsumer(destination); + consumer.Listener += OnReceiveMessage; + Tracer.Debug("Created message listener"); + return consumer; + } + + /// <summary> + /// Event handler that processes a received message. + /// </summary> + /// <param name="message">The message.</param> + private void OnReceiveMessage(IMessage message) + { + Tracer.Debug("Decoding message"); + string soapMsg = ((ITextMessage) message).Text; + byte[] buffer = Encoding.ASCII.GetBytes(soapMsg); + int dataLength = buffer.Length; + byte[] data1 = _bufferManager.TakeBuffer(dataLength); + Array.Copy(buffer, data1, dataLength); + + ArraySegment<byte> data = new ArraySegment<byte>(data1, 0, dataLength); + + byte[] msgContents = new byte[data.Count]; + Array.Copy(data.Array, data.Offset, msgContents, 0, msgContents.Length); + Message msg = _messageEncoderFactory.Encoder.ReadMessage(data, _bufferManager); + + Tracer.Debug(msg); + Dispatch(msg); + } + + /// <summary> + /// Called when an exception is thrown by the ActiveMQ listener. + /// </summary> + /// <remarks> + /// <see cref="NMSException" />s will be caught and logged; all other exceptions will + /// be thrown back up to the WCF service. + /// </remarks> + /// <param name="exception">The exception that was thrown.</param> + private void OnExceptionThrown(Exception exception) + { + if(exception is NMSException) + { + Tracer.ErrorFormat("{0} thrown : {1}\n{2}", + exception.GetType().Name, + exception.Message, + exception.StackTrace); + return; + } + + // TODO: can we recover from the exception? Do we convert to WCF exceptions? + throw exception; + } + + /// <summary> + /// Guarantees that a channel is attached to this listener. + /// </summary> + private void EnsureChannelAvailable() + { + NmsInputSessionChannel newChannel; + if(CreateOrRetrieveChannel(out newChannel)) + { + _channelQueue.EnqueueAndDispatch(newChannel); + } + } + + #region Private members + + private readonly Uri _uri; + private IConnection _connection; + private ISession _session; + private IDestination _destination; + private IMessageConsumer _consumer; + private readonly InputQueue<IInputSessionChannel> _channelQueue; + private NmsInputSessionChannel _currentChannel; + private readonly object _currentChannelLock; + private readonly MessageEncoderFactory _messageEncoderFactory; + private readonly BufferManager _bufferManager; + private string _destinationName; + private DestinationType _destinationType; + + #endregion + } +} Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs?rev=726083&r1=726082&r2=726083&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs Fri Dec 12 10:25:52 2008 @@ -20,28 +20,30 @@ using System.ServiceModel.Channels; using System.Text; using System.Xml; +using Apache.NMS.Util; namespace Apache.NMS.WCF { /// <summary> - /// Channel for sending messages. + /// Client-side implementation of the sessionless one-way channel. /// </summary> - public class NmsOutputChannel : NmsChannelBase, IOutputChannel + public class NmsOutputChannel : NmsOutputChannelBase, IOutputChannel { #region Constructors /// <summary> /// Initializes a new instance of the <see cref="NmsOutputChannel"/> class. /// </summary> + /// <param name="factory">The factory that created the channel.</param> + /// <param name="remoteAddress">The remote address of the channel.</param> + /// <param name="via">The URI that contains the transport address to which messages are sent on the output channel.</param> /// <param name="bufferManager">The buffer manager.</param> /// <param name="encoderFactory">The encoder factory.</param> - /// <param name="address">The address.</param> - /// <param name="parent">The parent.</param> - /// <param name="via">The via.</param> - public NmsOutputChannel(BufferManager bufferManager, MessageEncoderFactory encoderFactory, EndpointAddress address, NmsChannelFactory parent, Uri via) - : base(bufferManager, encoderFactory, address, parent, parent.Destination, parent.DestinationType) + /// <param name="destination">The name of the ActiveMQ destination.</param> + /// <param name="destinationType">The type of the ActiveMQ destination (either a queue or a topic, permanent or temporary).</param> + public NmsOutputChannel(ChannelManagerBase factory, EndpointAddress remoteAddress, Uri via, BufferManager bufferManager, MessageEncoderFactory encoderFactory, string destination, DestinationType destinationType) + : base(factory, remoteAddress, via, bufferManager, encoderFactory, destination, destinationType) { - _via = via; _connection = ConnectionFactoryManager.GetInstance().CreateConnection(via); _connection.Start(); } @@ -67,16 +69,14 @@ public void Send(Message message, TimeSpan timeout) { ThrowIfDisposedOrNotOpen(); + RemoteAddress.ApplyTo(message); using(NMS.ISession session = _connection.CreateSession()) { - IDestination destination = NmsChannelHelper.GetDestination(session, Destination, DestinationType); + IDestination destination = SessionUtil.GetDestination(session, Destination, DestinationType); using(IMessageProducer producer = session.CreateProducer(destination)) { producer.Persistent = true; - message.Headers.To = RemoteAddress.Uri; - //TODO: check if this is synonymous with the above operation - //RemoteAddress.ApplyTo(message); ITextMessage request = session.CreateTextMessage(TranslateMessage(message)); producer.Send(request); @@ -94,7 +94,7 @@ /// <param name="message">The message to be translated.</param> private string TranslateMessage(Message message) { - return (Encoder.MessageVersion == MessageVersion.Soap11) + return (this.Encoder.MessageVersion == MessageVersion.Soap11) ? TranslateMessageAsSoap11(message) : TranslateMessageAsSoap12(message); } @@ -169,17 +169,6 @@ NmsAsyncResult.End(result); } - /// <summary> - /// Gets the URI that contains the transport address to which messages are sent on the output channel. - /// </summary> - /// <returns> - /// The <see cref="T:System.Uri" /> that contains the transport address to which messages are sent on the output channel. - /// </returns> - public Uri Via - { - get { return _via; } - } - #endregion #region Implementation of CommunicationObject @@ -307,9 +296,8 @@ #region Private members - private readonly Uri _via; private readonly IConnection _connection; #endregion } -} \ No newline at end of file +} Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs?rev=726083&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs Fri Dec 12 10:25:52 2008 @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.ServiceModel; +using System.ServiceModel.Channels; + +namespace Apache.NMS.WCF +{ + /// <summary> + /// Base class for NMS output channels. + /// </summary> + public abstract class NmsOutputChannelBase : ChannelBase + { + #region Constructors + + /// <summary> + /// Initializes a new instance of the <see cref="NmsOutputChannelBase"/> class. + /// </summary> + /// <param name="factory">The factory that created the channel.</param> + /// <param name="remoteAddress">The remote address for the channel.</param> + /// <param name="via">The URI that contains the transport address to which messages are sent on the output channel.</param> + /// <param name="bufferManager">The buffer manager.</param> + /// <param name="encoderFactory">The encoder factory.</param> + /// <param name="destination">The name of the ActiveMQ destination.</param> + /// <param name="destinationType">The type of the ActiveMQ destination (either a queue or a topic, permanent or temporary).</param> + internal NmsOutputChannelBase(ChannelManagerBase factory, EndpointAddress remoteAddress, Uri via, BufferManager bufferManager, MessageEncoderFactory encoderFactory, string destination, DestinationType destinationType) + : base(factory) + { + _remoteAddress = remoteAddress; + _via = via; + _bufferManager = bufferManager; + _encoder = encoderFactory; + _destination = destination; + _destinationType = destinationType; + } + + #endregion + + #region NullRequestContextCollection + + //public NmsAsyncRequestContextCollection PendingRequests + //{ + // get { return _pendingRequests; } + //} + + #endregion + + #region Public properties + + /// <summary> + /// Gets the remote address. + /// </summary> + /// <value>The remote address.</value> + public EndpointAddress RemoteAddress + { + get { return _remoteAddress; } + } + + /// <summary> + /// Gets the routing address. + /// </summary> + /// <value>The routing address.</value> + public Uri Via + { + get { return _via; } + } + + /// <summary> + /// Gets the buffer manager. + /// </summary> + /// <value>The buffer manager.</value> + public BufferManager BufferManager + { + get { return _bufferManager; } + } + + /// <summary> + /// Gets the encoder. + /// </summary> + /// <value>The encoder.</value> + public MessageEncoder Encoder + { + get { return _encoder.Encoder; } + } + + /// <summary> + /// Gets the name of the destination (either a queue or a topic). + /// </summary> + /// <value>The name of the destination.</value> + public string Destination + { + get { return _destination; } + } + + /// <summary> + /// Gets the type of the destination. + /// </summary> + /// <value>The type of the destination.</value> + public DestinationType DestinationType + { + get { return _destinationType; } + } + + #endregion + + #region Abort + + /// <summary> + /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous abort operation. + /// </summary> + protected override void OnAbort() + { + //_pendingRequests.AbortAll(); + } + + #endregion + + #region Close + + /// <summary> + /// Inserts processing after a communication object transitions to the closing state due to the invocation of an asynchronous close operation. + /// </summary> + /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on close operation has to complete before timing out.</param> + /// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives notification of the completion of the asynchronous on close operation.</param> + /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on close operation.</param> + /// <returns> + /// The <see cref="T:System.IAsyncResult"/> that references the asynchronous on close operation. + /// </returns> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// <paramref name="timeout"/> is less than zero.</exception> + protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) + { + OnClose(timeout); + return new CompletedAsyncResult(callback, state); + } + + /// <summary> + /// Inserts processing on a communication object after it transitions to the closing state due to the invocation of a synchronous close operation. + /// </summary> + /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on close operation has to complete before timing out.</param> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// <paramref name="timeout"/> is less than zero.</exception> + protected override void OnClose(TimeSpan timeout) + { + //_pendingRequests.AbortAll(); + } + + /// <summary> + /// Completes an asynchronous operation on the close of a communication object. + /// </summary> + /// <param name="result">The <see cref="T:System.IAsyncResult"/> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)"/> method.</param> + /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception> + protected override void OnEndClose(IAsyncResult result) + { + CompletedAsyncResult.End(result); + } + + #endregion + + #region Open + + /// <summary> + /// Inserts processing on a communication object after it transitions to the opening state due to the invocation of an asynchronous open operation. + /// </summary> + /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on open operation has to complete before timing out.</param> + /// <param name="callback">The <see cref="T:System.AsyncCallback"/> delegate that receives notification of the completion of the asynchronous on open operation.</param> + /// <param name="state">An object, specified by the application, that contains state information associated with the asynchronous on open operation.</param> + /// <returns> + /// The <see cref="T:System.IAsyncResult"/> that references the asynchronous on open operation. + /// </returns> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// <paramref name="timeout"/> is less than zero.</exception> + protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) + { + OnOpen(timeout); + return new CompletedAsyncResult(callback, state); + } + + /// <summary> + /// Inserts processing on a communication object after it transitions into the opening state which must complete within a specified interval of time. + /// </summary> + /// <param name="timeout">The <see cref="T:System.Timespan"/> that specifies how long the on open operation has to complete before timing out.</param> + /// <exception cref="T:System.ArgumentOutOfRangeException"> + /// <paramref name="timeout"/> is less than zero.</exception> + /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception> + protected override void OnOpen(TimeSpan timeout) + { + } + + /// <summary> + /// Completes an asynchronous operation on the open of a communication object. + /// </summary> + /// <param name="result">The <see cref="T:System.IAsyncResult"/> that is returned by a call to the <see cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)"/> method.</param> + /// <exception cref="T:System.TimeoutException">The interval of time specified by <paramref name="timeout"/> that was allotted for the operation was exceeded before the operation was completed.</exception> + protected override void OnEndOpen(IAsyncResult result) + { + CompletedAsyncResult.End(result); + } + + #endregion + + #region GetProperty + + /// <summary> + /// Gets the property. + /// </summary> + /// <typeparam name="T"></typeparam> + /// <returns></returns> + public override T GetProperty<T>() + { + if(typeof(T) == typeof(FaultConverter)) + { + return FaultConverter.GetDefaultFaultConverter(MessageVersion.Soap12WSAddressing10) as T; + } + return base.GetProperty<T>(); + } + + #endregion + + #region Private members + + private EndpointAddress _remoteAddress; + private Uri _via; + private BufferManager _bufferManager; + private MessageEncoderFactory _encoder; + private string _destination; + private DestinationType _destinationType; + + // for request/reply pattern + //NullAsyncRequestContextCollection _pendingRequests = new NullAsyncRequestContextCollection(); + + #endregion + } +} Added: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs?rev=726083&view=auto ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs (added) +++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs Fri Dec 12 10:25:52 2008 @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.ServiceModel; +using System.ServiceModel.Channels; + +namespace Apache.NMS.WCF +{ + /// <summary> + /// Client-side implementation of the sessioned one-way channel. + /// </summary> + internal class NmsOutputSessionChannel : NmsOutputChannel, IOutputSessionChannel + { + #region Constructors + + /// <summary> + /// Initializes a new instance of the <see cref="NmsOutputSessionChannel"/> class. + /// </summary> + /// <param name="factory">The factory that created this channel.</param> + /// <param name="address">The address of this channel.</param> + /// <param name="via">The URI that contains the transport address to which messages are sent on the output channel.</param> + /// <param name="bufferManager">The buffer manager.</param> + /// <param name="encoderFactory">The encoder factory.</param> + /// <param name="destination">The name of the ActiveMQ destination.</param> + /// <param name="destinationType">The type of the ActiveMQ destination (either a queue or a topic, permanent or temporary).</param> + public NmsOutputSessionChannel(ChannelManagerBase factory, Uri via, EndpointAddress address, BufferManager bufferManager, MessageEncoderFactory encoderFactory, string destination, DestinationType destinationType) + : base(factory, address, via, bufferManager, encoderFactory, destination, destinationType) + { + } + + #endregion + + #region ISessionChannel<IOutputSession> Members + + /// <summary> + /// Gets the type of session associated with this channel. + /// </summary> + /// <value></value> + /// <returns>The type of <see cref="T:System.ServiceModel.Channels.ISession"/> associated with this channel. </returns> + public IOutputSession Session + { + get { return _session; } + } + + /// <summary> + /// Internal implementation of a session, with tracking ID. + /// </summary> + private class OutputSession : IOutputSession, System.ServiceModel.Channels.ISession + { + private string _sessionId = NmsChannelHelper.CreateUniqueSessionId(); + + /// <summary> + /// Gets the ID that uniquely identifies the session. + /// </summary> + /// <value></value> + /// <returns>The ID that uniquely identifies the session. </returns> + public string Id + { + get { return _sessionId; } + } + } + + #endregion + + #region Private members + + private IOutputSession _session = new OutputSession(); + + #endregion + } +} Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj?rev=726083&r1=726082&r2=726083&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj (original) +++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj Fri Dec 12 10:25:52 2008 @@ -66,12 +66,13 @@ </NoWarn> </PropertyGroup> <ItemGroup> - <Reference Include="Apache.NMS, Version=1.1.0.0, Culture=neutral, processorArchitecture=MSIL"> + <Reference Include="Apache.NMS, Version=1.1.0.0, Culture=neutral, PublicKeyToken=2a329723af30bc8d, processorArchitecture=MSIL"> <SpecificVersion>False</SpecificVersion> <HintPath>lib\Apache.NMS\net-3.5\Apache.NMS.dll</HintPath> </Reference> <Reference Include="System" /> <Reference Include="System.configuration" /> + <Reference Include="System.Data" /> <Reference Include="System.Runtime.Serialization"> <RequiredTargetFramework>3.0</RequiredTargetFramework> </Reference> @@ -121,12 +122,16 @@ <Compile Include="src\main\csharp\ConnectionFactoryManager.cs" /> <Compile Include="src\main\csharp\InputQueue.cs" /> <Compile Include="src\main\csharp\NmsAsyncResult.cs" /> - <Compile Include="src\main\csharp\NmsChannelBase.cs" /> + <Compile Include="src\main\csharp\NmsInputQueueChannelBase.cs" /> <Compile Include="src\main\csharp\NmsChannelFactory.cs" /> <Compile Include="src\main\csharp\NmsChannelHelper.cs" /> - <Compile Include="src\main\csharp\NmsChannelListener.cs" /> + <Compile Include="src\main\csharp\NmsInputChannelListener.cs" /> <Compile Include="src\main\csharp\NmsInputChannel.cs" /> + <Compile Include="src\main\csharp\NmsInputSessionChannel.cs" /> + <Compile Include="src\main\csharp\NmsInputSessionChannelListener.cs" /> <Compile Include="src\main\csharp\NmsOutputChannel.cs" /> + <Compile Include="src\main\csharp\NmsOutputChannelBase.cs" /> + <Compile Include="src\main\csharp\NmsOutputSessionChannel.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSHARP.Targets" /> <PropertyGroup> @@ -135,4 +140,4 @@ <PostBuildEvent>cd $(ProjectDir) nant -nologo -q install-all -D:compile.skip=true</PostBuildEvent> </PropertyGroup> -</Project> \ No newline at end of file +</Project>
