Added: 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs?rev=726083&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputQueueChannelBase.cs
 Fri Dec 12 10:25:52 2008
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+       /// <summary>
+       /// Base class for NMS input channels.
+       /// </summary>
+       /// <typeparam name="T"></typeparam>
+       public abstract class NmsInputQueueChannelBase<T> : ChannelBase where T 
: class
+       {
+               #region Constructors
+
+               /// <summary>
+               /// Initializes a new instance of the <see 
cref="NmsInputQueueChannelBase&lt;T&gt;"/> class.
+               /// </summary>
+               /// <param name="factory">The factory that was used to create 
the channel.</param>
+               /// <param name="localAddress">The local address of the 
channel.</param>
+               public NmsInputQueueChannelBase(ChannelListenerBase factory, 
EndpointAddress localAddress)
+                       : base(factory)
+               {
+                       _localAddress = localAddress;
+                       _messageQueue = new InputQueue<T>();
+               }
+
+               #endregion
+
+               #region Public properties
+
+               /// <summary>
+               /// Gets the local address.
+               /// </summary>
+               /// <value>The local address.</value>
+               public EndpointAddress LocalAddress
+               {
+                       get { return _localAddress; }
+               }
+
+               #endregion
+
+               #region Messaging
+
+               /// <summary>
+               /// Gets the pending message count.
+               /// </summary>
+               /// <value>The pending message count.</value>
+               public int PendingMessageCount
+               {
+                       get
+                       {
+                               return _messageQueue.PendingCount;
+                       }
+               }
+
+               /// <summary>
+               /// Dispatches the specified request.
+               /// </summary>
+               /// <param name="request">The request.</param>
+               public void Dispatch(T request)
+               {
+                       ThrowIfDisposedOrNotOpen();
+                       _messageQueue.EnqueueAndDispatch(request);
+               }
+
+               /// <summary>
+               /// Begins the dequeue operation.
+               /// </summary>
+               /// <param name="timeout">The timeout.</param>
+               /// <param name="callback">The callback.</param>
+               /// <param name="state">The state.</param>
+               public IAsyncResult BeginDequeue(TimeSpan timeout, 
AsyncCallback callback, object state)
+               {
+                       return (State == CommunicationState.Opened)
+                               ? _messageQueue.BeginDequeue(timeout, callback, 
state)
+                               : new CompletedAsyncResult(callback, state);
+               }
+
+               /// <summary>
+               /// Ends the dequeue operation.
+               /// </summary>
+               /// <param name="result">The result.</param>
+               /// <returns></returns>
+               public T EndDequeue(IAsyncResult result)
+               {
+                       ThrowIfDisposedOrNotOpen();
+                       return _messageQueue.EndDequeue(result);
+               }
+
+               /// <summary>
+               /// Dequeues the next message.
+               /// </summary>
+               /// <param name="timeout">The timeout.</param>
+               public T Dequeue(TimeSpan timeout)
+               {
+                       ThrowIfDisposedOrNotOpen();
+                       return _messageQueue.Dequeue(timeout);
+               }
+
+               /// <summary>
+               /// Tries to dequeue the next message.
+               /// </summary>
+               /// <param name="result">The result.</param>
+               /// <param name="message">The message.</param>
+               /// <returns></returns>
+               public bool TryDequeue(IAsyncResult result, out T message)
+               {
+                       message = null;
+                       TypedAsyncResult<T> completedResult = result as 
TypedAsyncResult<T>;
+                       if(completedResult != null)
+                       {
+                               message = TypedAsyncResult<T>.End(result);
+                       }
+                       else if(result.CompletedSynchronously == false)
+                       {
+                               InputQueue<T>.AsyncQueueReader completedResult2 
= result as InputQueue<T>.AsyncQueueReader;
+                               InputQueue<T>.AsyncQueueReader.End(result, out 
message);
+                       }
+                       return result.IsCompleted;
+               }
+
+               #endregion
+
+               #region Abort
+
+               /// <summary>
+               /// Inserts processing on a communication object after it 
transitions to the closing state due to the invocation of a synchronous abort 
operation.
+               /// </summary>
+               protected override void OnAbort()
+               {
+                       _messageQueue.Close();
+               }
+
+               #endregion
+
+               #region Open
+
+               /// <summary>
+               /// Inserts processing on a communication object after it 
transitions to the opening state due to the invocation of an asynchronous open 
operation.
+               /// </summary>
+               /// <param name="timeout">The <see cref="T:System.Timespan"/> 
that specifies how long the on open operation has to complete before timing 
out.</param>
+               /// <param name="callback">The <see 
cref="T:System.AsyncCallback"/> delegate that receives notification of the 
completion of the asynchronous on open operation.</param>
+               /// <param name="state">An object, specified by the 
application, that contains state information associated with the asynchronous 
on open operation.</param>
+               /// <returns>
+               /// The <see cref="T:System.IAsyncResult"/> that references the 
asynchronous on open operation.
+               /// </returns>
+               /// <exception cref="T:System.ArgumentOutOfRangeException">
+               ///     <paramref name="timeout"/> is less than 
zero.</exception>
+               protected override IAsyncResult OnBeginOpen(TimeSpan timeout, 
AsyncCallback callback, object state)
+               {
+                       OnOpen(timeout);
+                       return new CompletedAsyncResult(callback, state);
+               }
+
+               /// <summary>
+               /// Inserts processing on a communication object after it 
transitions into the opening state which must complete within a specified 
interval of time.
+               /// </summary>
+               /// <param name="timeout">The <see cref="T:System.Timespan"/> 
that specifies how long the on open operation has to complete before timing 
out.</param>
+               /// <exception cref="T:System.ArgumentOutOfRangeException">
+               ///     <paramref name="timeout"/> is less than 
zero.</exception>
+               /// <exception cref="T:System.TimeoutException">The interval of 
time specified by <paramref name="timeout"/> that was allotted for the 
operation was exceeded before the operation was completed.</exception>
+               protected override void OnOpen(TimeSpan timeout)
+               {
+                       _messageQueue.Open();
+               }
+
+               /// <summary>
+               /// Completes an asynchronous operation on the open of a 
communication object.
+               /// </summary>
+               /// <param name="result">The <see 
cref="T:System.IAsyncResult"/> that is returned by a call to the <see 
cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)"/>
 method.</param>
+               /// <exception cref="T:System.TimeoutException">The interval of 
time specified by <paramref name="timeout"/> that was allotted for the 
operation was exceeded before the operation was completed.</exception>
+               protected override void OnEndOpen(IAsyncResult result)
+               {
+                       CompletedAsyncResult.End(result);
+               }
+
+               #endregion
+
+               #region Close
+
+               /// <summary>
+               /// Inserts processing after a communication object transitions 
to the closing state due to the invocation of an asynchronous close operation.
+               /// </summary>
+               /// <param name="timeout">The <see cref="T:System.Timespan"/> 
that specifies how long the on close operation has to complete before timing 
out.</param>
+               /// <param name="callback">The <see 
cref="T:System.AsyncCallback"/> delegate that receives notification of the 
completion of the asynchronous on close operation.</param>
+               /// <param name="state">An object, specified by the 
application, that contains state information associated with the asynchronous 
on close operation.</param>
+               /// <returns>
+               /// The <see cref="T:System.IAsyncResult"/> that references the 
asynchronous on close operation.
+               /// </returns>
+               /// <exception cref="T:System.ArgumentOutOfRangeException">
+               ///     <paramref name="timeout"/> is less than 
zero.</exception>
+               protected override IAsyncResult OnBeginClose(TimeSpan timeout, 
AsyncCallback callback, object state)
+               {
+                       OnClose(timeout);
+                       return new CompletedAsyncResult(callback, state);
+               }
+
+               /// <summary>
+               /// Inserts processing on a communication object after it 
transitions to the closing state due to the invocation of a synchronous close 
operation.
+               /// </summary>
+               /// <param name="timeout">The <see cref="T:System.Timespan"/> 
that specifies how long the on close operation has to complete before timing 
out.</param>
+               /// <exception cref="T:System.ArgumentOutOfRangeException">
+               ///     <paramref name="timeout"/> is less than 
zero.</exception>
+               protected override void OnClose(TimeSpan timeout)
+               {
+                       _messageQueue.Close();
+               }
+
+               /// <summary>
+               /// Completes an asynchronous operation on the close of a 
communication object.
+               /// </summary>
+               /// <param name="result">The <see 
cref="T:System.IAsyncResult"/> that is returned by a call to the <see 
cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)"/>
 method.</param>
+               /// <exception cref="T:System.TimeoutException">The interval of 
time specified by <paramref name="timeout"/> that was allotted for the 
operation was exceeded before the operation was completed.</exception>
+               protected override void OnEndClose(IAsyncResult result)
+               {
+                       CompletedAsyncResult.End(result);
+               }
+
+               #endregion
+
+               #region GetProperty
+
+               /// <summary>
+               /// Gets the property.
+               /// </summary>
+               /// <typeparam name="P"></typeparam>
+               public override P GetProperty<P>()
+               {
+                       if(typeof(P) == typeof(FaultConverter))
+                       {
+                               return 
FaultConverter.GetDefaultFaultConverter(MessageVersion.Soap12WSAddressing10) as 
P;
+                       }
+                       return base.GetProperty<P>();
+               }
+
+               #endregion
+
+               #region Private members
+
+               private EndpointAddress _localAddress;
+               private InputQueue<T> _messageQueue;
+
+               #endregion
+       }
+}

Added: 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs?rev=726083&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannel.cs
 Fri Dec 12 10:25:52 2008
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+       /// <summary>
+       /// Server-side implementation of the sessioned one-way channel.
+       /// </summary>
+       public class NmsInputSessionChannel : NmsInputChannel, 
IInputSessionChannel
+       {
+               #region Constructors
+
+               /// <summary>
+               /// Initializes a new instance of the <see 
cref="NmsInputSessionChannel"/> class.
+               /// </summary>
+               /// <param name="factory">The factory that was used to create 
the channel.</param>
+               /// <param name="localAddress">The local address of the 
channel.</param>
+               internal NmsInputSessionChannel(ChannelListenerBase factory, 
EndpointAddress localAddress)
+                       : base(factory, localAddress)
+               {
+               }
+
+               #endregion
+
+               #region ISessionChannel<IInputSession> Members
+
+               /// <summary>
+               /// Gets the type of session associated with this channel.
+               /// </summary>
+               /// <value></value>
+               /// <returns>The type of <see 
cref="T:System.ServiceModel.Channels.ISession"/> associated with this channel. 
</returns>
+               public IInputSession Session
+               {
+                       get { return _session; }
+               }
+
+               /// <summary>
+               /// Internal implementation of a session, with tracking ID.
+               /// </summary>
+               private class InputSession : IInputSession, 
System.ServiceModel.Channels.ISession
+               {
+                       private string _sessionId = 
NmsChannelHelper.CreateUniqueSessionId();
+
+                       /// <summary>
+                       /// Gets the ID that uniquely identifies the session.
+                       /// </summary>
+                       /// <value></value>
+                       /// <returns>The ID that uniquely identifies the 
session. </returns>
+                       public string Id
+                       {
+                               get { return _sessionId; }
+                       }
+               }
+
+               #endregion
+
+               #region Private members
+
+               private IInputSession _session = new InputSession();
+
+               #endregion
+       }
+}

Added: 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs?rev=726083&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsInputSessionChannelListener.cs
 Fri Dec 12 10:25:52 2008
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+using System.Text;
+using Apache.NMS.Util;
+
+namespace Apache.NMS.WCF
+{
+       /// <summary>
+       /// Server-side listener for sessioned input channels.
+       /// </summary>
+       public class NmsInputSessionChannelListener : 
ChannelListenerBase<IInputSessionChannel>, IChannel
+       {
+               #region Constructors
+
+               /// <summary>
+               /// Initializes a new instance of the <see 
cref="NmsInputSessionChannelListener"/> class.
+               /// </summary>
+               /// <param name="transportElement">The binding element.</param>
+               /// <param name="context">The context.</param>
+               internal 
NmsInputSessionChannelListener(NmsTransportBindingElement transportElement, 
BindingContext context)
+                       : base(context.Binding)
+               {
+                       _bufferManager = 
BufferManager.CreateBufferManager(transportElement.MaxBufferPoolSize, (int) 
transportElement.MaxReceivedMessageSize);
+
+                       MessageEncodingBindingElement 
messageEncoderBindingElement = 
context.BindingParameters.Remove<MessageEncodingBindingElement>();
+                       _messageEncoderFactory = (messageEncoderBindingElement 
!= null)
+                               ? 
messageEncoderBindingElement.CreateMessageEncoderFactory()
+                               : NmsConstants.DefaultMessageEncoderFactory;
+
+                       _channelQueue = new InputQueue<IInputSessionChannel>();
+                       _currentChannelLock = new object();
+                       _destinationName = transportElement.Destination;
+                       _destinationType = transportElement.DestinationType;
+                       _uri = new Uri(context.ListenUriBaseAddress, 
context.ListenUriRelativeAddress);
+                       Tracer.DebugFormat("Listening to {0} at {1}/{2}", 
_destinationType, _uri, _destinationName);
+               }
+
+               #endregion
+
+               #region Public properties
+
+               /// <summary>
+               /// Gets the message encoder factory.
+               /// </summary>
+               /// <value>The message encoder factory.</value>
+               public MessageEncoderFactory MessageEncoderFactory
+               {
+                       get { return _messageEncoderFactory; }
+               }
+
+               /// <summary>
+               /// Gets or sets the destination.
+               /// </summary>
+               /// <value>The destination.</value>
+               public string Destination
+               {
+                       get { return _destinationName; }
+                       set { _destinationName = value; }
+               }
+
+               /// <summary>
+               /// Gets or sets the type of the destination.
+               /// </summary>
+               /// <value>The type of the destination.</value>
+               public DestinationType DestinationType
+               {
+                       get { return _destinationType; }
+                       set { _destinationType = value; }
+               }
+
+               #endregion
+
+               #region Implementation of CommunicationObject
+
+               /// <summary>
+               /// Inserts processing on a communication object after it 
transitions to the closing state 
+               /// due to the invocation of a synchronous abort operation.
+               /// </summary>
+               /// <remarks>
+               /// Abort can be called at any time, so we can't assume that 
we've been Opened successfully 
+               /// (and thus may not have any listen sockets).
+               /// </remarks>
+               protected override void OnAbort()
+               {
+                       OnClose(TimeSpan.Zero);
+               }
+
+               /// <summary>
+               /// Inserts processing on a communication object after it 
transitions to the closing state due to the invocation of a synchronous close 
operation.
+               /// </summary>
+               /// <param name="timeout">The <see cref="T:System.TimeSpan" /> 
that specifies how long the on close operation has to complete before timing 
out.</param>
+               /// <exception 
cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less 
than zero.</exception>
+               protected override void OnClose(TimeSpan timeout)
+               {
+                       lock(ThisLock)
+                       {
+                               if(_consumer != null)
+                               {
+                                       Tracer.Debug("Listener is terminating 
consumer...");
+                                       _consumer.Close();
+                                       _consumer.Dispose();
+                                       Tracer.Debug("Listener has terminated 
consumer");
+                               }
+
+                               if(_session != null)
+                               {
+                                       Tracer.Debug("Listener is terminating 
session...");
+                                       _session.Close();
+                                       Tracer.Debug("Listener has terminated 
session");
+                               }
+
+                               if(_connection != null)
+                               {
+                                       Tracer.Debug("Listener is terminating 
connection...");
+                                       _connection.Stop();
+                                       _connection.Close();
+                                       _connection.Dispose();
+                                       Tracer.Debug("Listener has terminated 
connection");
+                               }
+
+                               _channelQueue.Close();
+                       }
+               }
+
+               /// <summary>
+               /// Inserts processing after a communication object transitions 
to the closing state due to the invocation of an asynchronous close operation.
+               /// </summary>
+               /// <returns>
+               /// The <see cref="T:System.IAsyncResult" /> that references 
the asynchronous on close operation. 
+               /// </returns>
+               /// <param name="timeout">The <see cref="T:System.TimeSpan" /> 
that specifies how long the on close operation has to complete before timing 
out.</param>
+               /// <param name="callback">The <see 
cref="T:System.AsyncCallback" /> delegate that receives notification of the 
completion of the asynchronous on close operation.</param>
+               /// <param name="state">An object, specified by the 
application, that contains state information associated with the asynchronous 
on close operation.</param>
+               /// <exception 
cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less 
than zero.</exception>
+               protected override IAsyncResult OnBeginClose(TimeSpan timeout, 
AsyncCallback callback, object state)
+               {
+                       OnClose(timeout);
+                       return new CompletedAsyncResult(callback, state);
+               }
+
+               /// <summary>
+               /// Completes an asynchronous operation on the close of a 
communication object.
+               /// </summary>
+               /// <param name="result">The <see cref="T:System.IAsyncResult" 
/> that is returned by a call to the <see 
cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)"
 /> method.</param>
+               protected override void OnEndClose(IAsyncResult result)
+               {
+                       CompletedAsyncResult.End(result);
+               }
+
+               /// <summary>
+               /// Inserts processing on a communication object after it 
transitions into the opening state which must complete within a specified 
interval of time.
+               /// </summary>
+               /// <param name="timeout">The <see cref="T:System.TimeSpan" /> 
that specifies how long the on open operation has to complete before timing 
out.</param>
+               /// <exception 
cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less 
than zero.</exception>
+               /// <exception cref="T:System.TimeoutException">The interval of 
time specified by <paramref name="timeout" /> that was allotted for the 
operation was exceeded before the operation was completed.</exception>
+               protected override void OnOpen(TimeSpan timeout)
+               {
+                       if(Uri == null)
+                       {
+                               throw new InvalidOperationException("Uri must 
be set before ChannelListener is opened.");
+                       }
+                       NmsChannelHelper.ValidateTimeout(timeout);
+               }
+
+               /// <summary>
+               /// Inserts processing on a communication object after it 
transitions to the opening state due to the invocation of an asynchronous open 
operation.
+               /// </summary>
+               /// <returns>
+               /// The <see cref="T:System.IAsyncResult" /> that references 
the asynchronous on open operation. 
+               /// </returns>
+               /// <param name="timeout">The <see cref="T:System.TimeSpan" /> 
that specifies how long the on open operation has to complete before timing 
out.</param>
+               /// <param name="callback">The <see 
cref="T:System.AsyncCallback" /> delegate that receives notification of the 
completion of the asynchronous on open operation.</param>
+               /// <param name="state">An object, specified by the 
application, that contains state information associated with the asynchronous 
on open operation.</param>
+               /// <exception 
cref="T:System.ArgumentOutOfRangeException"><paramref name="timeout" /> is less 
than zero.</exception>
+               protected override IAsyncResult OnBeginOpen(TimeSpan timeout, 
AsyncCallback callback, object state)
+               {
+                       NmsChannelHelper.ValidateTimeout(timeout);
+                       OnOpen(timeout);
+                       return new CompletedAsyncResult(callback, state);
+               }
+
+               /// <summary>
+               /// Completes an asynchronous operation on the open of a 
communication object.
+               /// </summary>
+               /// <param name="result">The <see cref="T:System.IAsyncResult" 
/> that is returned by a call to the <see 
cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)"
 /> method.</param>
+               protected override void OnEndOpen(IAsyncResult result)
+               {
+                       CompletedAsyncResult.End(result);
+               }
+
+               #endregion
+
+               #region Implementation of ChannelListenerBase
+
+               /// <summary>
+               /// When implemented in derived class, gets the URI on which 
the channel listener listens for an incoming channel.
+               /// </summary>
+               /// <returns>
+               /// The <see cref="T:System.Uri" /> on which the channel 
listener listens for incoming channels.
+               /// </returns>
+               public override Uri Uri
+               {
+                       get { return _uri; }
+               }
+
+               /// <summary>
+               /// When overridden in a derived class, provides a point of 
extensibility when waiting for a channel to arrive.
+               /// </summary>
+               /// <returns>
+               /// true if the method completed before the interval of time 
specified by the <paramref name="timeout" /> expired; otherwise false.
+               /// </returns>
+               /// <param name="timeout">The <see cref="T:System.TimeSpan" /> 
that specifies how long the on wait for a channel operation has to complete 
before timing out.</param>
+               protected override bool OnWaitForChannel(TimeSpan timeout)
+               {
+                       NmsChannelHelper.ValidateTimeout(timeout);
+                       return _channelQueue.WaitForItem(timeout);
+               }
+
+               /// <summary>
+               /// When implemented in a derived class, provides a point of 
extensibility when starting to wait for a channel to arrive.
+               /// </summary>
+               /// <returns>
+               /// The <see cref="T:System.IAsyncResult" /> that references 
the asynchronous on begin wait operation. 
+               /// </returns>
+               /// <param name="timeout">The <see cref="T:System.TimeSpan" /> 
that specifies how long the on begin wait operation has to complete before 
timing out.</param>
+               /// <param name="callback">The <see 
cref="T:System.AsyncCallback" /> delegate that receives the notification of the 
asynchronous operation on begin wait completion.</param>
+               /// <param name="state">An object, specified by the 
application, that contains state information associated with the asynchronous 
on begin wait operation.</param>
+               protected override IAsyncResult OnBeginWaitForChannel(TimeSpan 
timeout, AsyncCallback callback, object state)
+               {
+                       NmsChannelHelper.ValidateTimeout(timeout);
+                       return _channelQueue.BeginWaitForItem(timeout, 
callback, state);
+               }
+
+               /// <summary>
+               /// When implemented in a derived class, provides a point of 
extensibility when ending the waiting for a channel to arrive.
+               /// </summary>
+               /// <returns>
+               /// true if the method completed before the timeout expired; 
otherwise false.
+               /// </returns>
+               /// <param name="result">The <see cref="T:System.IAsyncResult" 
/> returned by a call to the <see 
cref="M:System.ServiceModel.Channels.ChannelListenerBase.OnBeginWaitForChannel(System.TimeSpan,System.AsyncCallback,System.Object)"
 /> method.</param>
+               protected override bool OnEndWaitForChannel(IAsyncResult result)
+               {
+                       return _channelQueue.EndWaitForItem(result);
+               }
+
+               /// <summary>
+               /// When implemented in a derived class, provides an 
extensibility point when accepting a channel.
+               /// </summary>
+               /// <returns>
+               /// The <see cref="T:System.ServiceModel.Channels.IChannel" /> 
accepted.
+               /// </returns>
+               /// <param name="timeout">The <see cref="T:System.TimeSpan" /> 
that specifies how long the accept channel operation has to complete before 
timing out.</param>
+               protected override IInputSessionChannel 
OnAcceptChannel(TimeSpan timeout)
+               {
+                       Tracer.Debug("Accepting channel");
+                       NmsChannelHelper.ValidateTimeout(timeout);
+                       if(!IsDisposed)
+                       {
+                               EnsureChannelAvailable();
+                       }
+
+                       IInputSessionChannel channel;
+                       if(_channelQueue.Dequeue(timeout, out channel))
+                       {
+                               return channel;
+                       }
+                       throw new TimeoutException(String.Format("Accept on 
listener at address {0} timed out after {1}.", Uri.AbsoluteUri, timeout));
+               }
+
+               /// <summary>
+               /// When implemented in a derived class, provides an 
asynchronous extensibility point when beginning to accept a channel.
+               /// </summary>
+               /// <returns>
+               /// The <see cref="T:System.IAsyncResult" /> that references 
the asynchronous accept channel operation. 
+               /// </returns>
+               /// <param name="timeout">The <see cref="T:System.TimeSpan" /> 
that specifies how long the accept channel operation has to complete before 
timing out.</param>
+               /// <param name="callback">The <see 
cref="T:System.AsyncCallback" /> delegate that receives the notification of the 
asynchronous completion of the accept channel operation.</param>
+               /// <param name="state">An object, specified by the 
application, that contains state information associated with the asynchronous 
accept channel operation.</param>
+               protected override IAsyncResult OnBeginAcceptChannel(TimeSpan 
timeout, AsyncCallback callback, object state)
+               {
+                       NmsChannelHelper.ValidateTimeout(timeout);
+                       if(!IsDisposed)
+                       {
+                               EnsureChannelAvailable();
+                       }
+                       return _channelQueue.BeginDequeue(timeout, callback, 
state);
+               }
+
+               /// <summary>
+               /// When implemented in a derived class, provides an 
asynchronous extensibility point when completing the acceptance a channel.
+               /// </summary>
+               /// <returns>
+               /// The <see cref="T:System.ServiceModel.Channels.IChannel" /> 
accepted by the listener.
+               /// </returns>
+               /// <param name="result">The <see cref="T:System.IAsyncResult" 
/> returned by a call to the <see 
cref="M:System.ServiceModel.Channels.ChannelListenerBase`1.OnBeginAcceptChannel(System.TimeSpan,System.AsyncCallback,System.Object)"
 /> method.</param>
+               protected override IInputSessionChannel 
OnEndAcceptChannel(IAsyncResult result)
+               {
+                       IInputSessionChannel channel;
+                       if(_channelQueue.EndDequeue(result, out channel))
+                       {
+                               return channel;
+                       }
+                       throw new TimeoutException();
+               }
+
+               #endregion
+
+               /// <summary>
+               /// Dispatches the callback.
+               /// </summary>
+               /// <param name="state">The state.</param>
+               internal void DispatchCallback(object state)
+               {
+                       Dispatch((Message) state);
+               }
+
+               /// <summary>
+               /// Matches an incoming message to its waiting listener,
+               /// using the FilterTable to dispatch the message to the correct
+               /// listener. If no listener is waiting for the message, it is 
silently
+               /// discarded.
+               /// </summary>
+               internal void Dispatch(Message message)
+               {
+                       if(message == null)
+                       {
+                               return;
+                       }
+
+                       try
+                       {
+                               NmsInputSessionChannel newChannel;
+                               bool channelCreated = 
CreateOrRetrieveChannel(out newChannel);
+
+                               Tracer.Debug("Dispatching incoming message");
+                               newChannel.Dispatch(message);
+
+                               if(channelCreated)
+                               {
+                                       //Hand the channel off to whomever is 
waiting for AcceptChannel() to complete
+                                       Tracer.Debug("Handing off channel");
+                                       
_channelQueue.EnqueueAndDispatch(newChannel);
+                               }
+                       }
+                       catch(Exception e)
+                       {
+                               Tracer.ErrorFormat("Error dispatching Message: 
{0}", e.ToString());
+                       }
+               }
+
+               /// <summary>
+               /// Creates or retrieves the channel.
+               /// </summary>
+               /// <param name="newChannel">The channel.</param>
+               private bool CreateOrRetrieveChannel(out NmsInputSessionChannel 
newChannel)
+               {
+                       bool channelCreated = false;
+
+                       if((newChannel = _currentChannel) == null)
+                       {
+                               lock(_currentChannelLock)
+                               {
+                                       if((newChannel = _currentChannel) == 
null)
+                                       {
+                                               newChannel = 
CreateNmsChannel(Uri);
+                                               newChannel.Closed += 
OnChannelClosed;
+                                               _currentChannel = newChannel;
+                                               channelCreated = true;
+                                       }
+                               }
+                       }
+
+                       return channelCreated;
+               }
+
+               /// <summary>
+               /// Called when the channel is closed.
+               /// </summary>
+               /// <param name="sender">The sender.</param>
+               /// <param name="args">The <see cref="System.EventArgs"/> 
instance containing the event data.</param>
+               private void OnChannelClosed(object sender, EventArgs args)
+               {
+                       NmsInputSessionChannel channel = 
(NmsInputSessionChannel) sender;
+
+                       lock(_currentChannelLock)
+                       {
+                               if(channel == _currentChannel)
+                               {
+                                       _currentChannel = null;
+                               }
+                       }
+               }
+
+               /// <summary>
+               /// Creates the <see cref="NmsInputChannel" /> that will wait 
for inbound messages.
+               /// </summary>
+               /// <param name="uri">The URI for the message queue.</param>
+               private NmsInputSessionChannel CreateNmsChannel(Uri uri)
+               {
+                       _connection = OpenConnection(uri);
+                       _session = OpenSession(_connection);
+                       _destination = SessionUtil.GetDestination(_session, 
Destination, DestinationType);
+                       _consumer = CreateConsumer(_session, _destination);
+
+                       EndpointAddress address = new EndpointAddress(uri);
+                       return new NmsInputSessionChannel(this, address);
+               }
+
+               /// <summary>
+               /// Opens the connection to the message broker.
+               /// </summary>
+               /// <param name="uri">The URI.</param>
+               /// <returns>An active connection to the ActiveMQ message 
broker specified by the URI;
+               /// exceptions will be caught by the attached 
ExceptionListener.</returns>
+               private IConnection OpenConnection(Uri uri)
+               {
+                       IConnection connection = 
ConnectionFactoryManager.GetInstance().CreateConnection(uri);
+                       connection.ExceptionListener += OnExceptionThrown;
+                       connection.Start();
+                       Tracer.Debug("Connection open");
+                       return connection;
+               }
+
+               /// <summary>
+               /// Opens a session to communicate with a message queue.
+               /// </summary>
+               /// <param name="connection">The connection to the ActiveMQ 
message broker.</param>
+               /// <returns>A session.</returns>
+               /// <exception cref="InvalidOperationException">the <paramref 
name="connection" /> has not yet
+               /// been started.</exception>
+               private ISession OpenSession(IConnection connection)
+               {
+                       if(!connection.IsStarted)
+                       {
+                               throw new InvalidOperationException("The 
connection has not yet been opened");
+                       }
+
+                       Tracer.Debug("Opening session...");
+                       ISession session = connection.CreateSession();
+                       Tracer.Debug("Session open");
+                       return session;
+               }
+
+               /// <summary>
+               /// Creates the consumer of messages received on the <paramref 
name="session"/>.
+               /// </summary>
+               /// <param name="session">The session.</param>
+               /// <param name="destination">The destination.</param>
+               /// <returns>A consumer for any messages received during the 
session;
+               /// messages will be consumed by the attached 
Listener.</returns>
+               private IMessageConsumer CreateConsumer(ISession session, 
IDestination destination)
+               {
+                       Tracer.Debug("Creating message listener...");
+                       IMessageConsumer consumer = 
session.CreateConsumer(destination);
+                       consumer.Listener += OnReceiveMessage;
+                       Tracer.Debug("Created message listener");
+                       return consumer;
+               }
+
+               /// <summary>
+               /// Event handler that processes a received message.
+               /// </summary>
+               /// <param name="message">The message.</param>
+               private void OnReceiveMessage(IMessage message)
+               {
+                       Tracer.Debug("Decoding message");
+                       string soapMsg = ((ITextMessage) message).Text;
+                       byte[] buffer = Encoding.ASCII.GetBytes(soapMsg);
+                       int dataLength = buffer.Length;
+                       byte[] data1 = _bufferManager.TakeBuffer(dataLength);
+                       Array.Copy(buffer, data1, dataLength);
+
+                       ArraySegment<byte> data = new ArraySegment<byte>(data1, 
0, dataLength);
+
+                       byte[] msgContents = new byte[data.Count];
+                       Array.Copy(data.Array, data.Offset, msgContents, 0, 
msgContents.Length);
+                       Message msg = 
_messageEncoderFactory.Encoder.ReadMessage(data, _bufferManager);
+
+                       Tracer.Debug(msg);
+                       Dispatch(msg);
+               }
+
+               /// <summary>
+               /// Called when an exception is thrown by the ActiveMQ listener.
+               /// </summary>
+               /// <remarks>
+               /// <see cref="NMSException" />s will be caught and logged; all 
other exceptions will
+               /// be thrown back up to the WCF service.
+               /// </remarks>
+               /// <param name="exception">The exception that was 
thrown.</param>
+               private void OnExceptionThrown(Exception exception)
+               {
+                       if(exception is NMSException)
+                       {
+                               Tracer.ErrorFormat("{0} thrown : {1}\n{2}",
+                                       exception.GetType().Name,
+                                       exception.Message,
+                                       exception.StackTrace);
+                               return;
+                       }
+
+                       // TODO: can we recover from the exception? Do we 
convert to WCF exceptions?
+                       throw exception;
+               }
+
+               /// <summary>
+               /// Guarantees that a channel is attached to this listener.
+               /// </summary>
+               private void EnsureChannelAvailable()
+               {
+                       NmsInputSessionChannel newChannel;
+                       if(CreateOrRetrieveChannel(out newChannel))
+                       {
+                               _channelQueue.EnqueueAndDispatch(newChannel);
+                       }
+               }
+
+               #region Private members
+
+               private readonly Uri _uri;
+               private IConnection _connection;
+               private ISession _session;
+               private IDestination _destination;
+               private IMessageConsumer _consumer;
+               private readonly InputQueue<IInputSessionChannel> _channelQueue;
+               private NmsInputSessionChannel _currentChannel;
+               private readonly object _currentChannelLock;
+               private readonly MessageEncoderFactory _messageEncoderFactory;
+               private readonly BufferManager _bufferManager;
+               private string _destinationName;
+               private DestinationType _destinationType;
+
+               #endregion
+       }
+}

Modified: 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannel.cs
 Fri Dec 12 10:25:52 2008
@@ -20,28 +20,30 @@
 using System.ServiceModel.Channels;
 using System.Text;
 using System.Xml;
+using Apache.NMS.Util;
 
 namespace Apache.NMS.WCF
 {
        /// <summary>
-       /// Channel for sending messages.
+       /// Client-side implementation of the sessionless one-way channel.
        /// </summary>
-       public class NmsOutputChannel : NmsChannelBase, IOutputChannel
+       public class NmsOutputChannel : NmsOutputChannelBase, IOutputChannel
        {
                #region Constructors
 
                /// <summary>
                /// Initializes a new instance of the <see 
cref="NmsOutputChannel"/> class.
                /// </summary>
+               /// <param name="factory">The factory that created the 
channel.</param>
+               /// <param name="remoteAddress">The remote address of the 
channel.</param>
+               /// <param name="via">The URI that contains the transport 
address to which messages are sent on the output channel.</param>
                /// <param name="bufferManager">The buffer manager.</param>
                /// <param name="encoderFactory">The encoder factory.</param>
-               /// <param name="address">The address.</param>
-               /// <param name="parent">The parent.</param>
-               /// <param name="via">The via.</param>
-               public NmsOutputChannel(BufferManager bufferManager, 
MessageEncoderFactory encoderFactory, EndpointAddress address, 
NmsChannelFactory parent, Uri via)
-                       : base(bufferManager, encoderFactory, address, parent, 
parent.Destination, parent.DestinationType)
+               /// <param name="destination">The name of the ActiveMQ 
destination.</param>
+               /// <param name="destinationType">The type of the ActiveMQ 
destination (either a queue or a topic, permanent or temporary).</param>
+               public NmsOutputChannel(ChannelManagerBase factory, 
EndpointAddress remoteAddress, Uri via, BufferManager bufferManager, 
MessageEncoderFactory encoderFactory, string destination, DestinationType 
destinationType)
+                       : base(factory, remoteAddress, via, bufferManager, 
encoderFactory, destination, destinationType)
                {
-                       _via = via;
                        _connection = 
ConnectionFactoryManager.GetInstance().CreateConnection(via);
                        _connection.Start();
                }
@@ -67,16 +69,14 @@
                public void Send(Message message, TimeSpan timeout)
                {
                        ThrowIfDisposedOrNotOpen();
+                       RemoteAddress.ApplyTo(message);
 
                        using(NMS.ISession session = 
_connection.CreateSession())
                        {
-                               IDestination destination = 
NmsChannelHelper.GetDestination(session, Destination, DestinationType);
+                               IDestination destination = 
SessionUtil.GetDestination(session, Destination, DestinationType);
                                using(IMessageProducer producer = 
session.CreateProducer(destination))
                                {
                                        producer.Persistent = true;
-                                       message.Headers.To = RemoteAddress.Uri;
-                                       //TODO: check if this is synonymous 
with the above operation
-                                       //RemoteAddress.ApplyTo(message);
 
                                        ITextMessage request = 
session.CreateTextMessage(TranslateMessage(message));
                                        producer.Send(request);
@@ -94,7 +94,7 @@
                /// <param name="message">The message to be translated.</param>
                private string TranslateMessage(Message message)
                {
-                       return (Encoder.MessageVersion == MessageVersion.Soap11)
+                       return (this.Encoder.MessageVersion == 
MessageVersion.Soap11)
                                ? TranslateMessageAsSoap11(message)
                                : TranslateMessageAsSoap12(message);
                }
@@ -169,17 +169,6 @@
                        NmsAsyncResult.End(result);
                }
 
-               /// <summary>
-               /// Gets the URI that contains the transport address to which 
messages are sent on the output channel.
-               /// </summary>
-               /// <returns>
-               /// The <see cref="T:System.Uri" /> that contains the transport 
address to which messages are sent on the output channel.
-               /// </returns>
-               public Uri Via
-               {
-                       get { return _via; }
-               }
-
                #endregion
 
                #region Implementation of CommunicationObject
@@ -307,9 +296,8 @@
 
                #region Private members
 
-               private readonly Uri _via;
                private readonly IConnection _connection;
 
                #endregion
        }
-}
\ No newline at end of file
+}

Added: 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs?rev=726083&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputChannelBase.cs
 Fri Dec 12 10:25:52 2008
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+       /// <summary>
+       /// Base class for NMS output channels.
+       /// </summary>
+       public abstract class NmsOutputChannelBase : ChannelBase
+       {
+               #region Constructors
+
+               /// <summary>
+               /// Initializes a new instance of the <see 
cref="NmsOutputChannelBase"/> class.
+               /// </summary>
+               /// <param name="factory">The factory that created the 
channel.</param>
+               /// <param name="remoteAddress">The remote address for the 
channel.</param>
+               /// <param name="via">The URI that contains the transport 
address to which messages are sent on the output channel.</param>
+               /// <param name="bufferManager">The buffer manager.</param>
+               /// <param name="encoderFactory">The encoder factory.</param>
+               /// <param name="destination">The name of the ActiveMQ 
destination.</param>
+               /// <param name="destinationType">The type of the ActiveMQ 
destination (either a queue or a topic, permanent or temporary).</param>
+               internal NmsOutputChannelBase(ChannelManagerBase factory, 
EndpointAddress remoteAddress, Uri via, BufferManager bufferManager, 
MessageEncoderFactory encoderFactory, string destination, DestinationType 
destinationType)
+                       : base(factory)
+               {
+                       _remoteAddress = remoteAddress;
+                       _via = via;
+                       _bufferManager = bufferManager;
+                       _encoder = encoderFactory;
+                       _destination = destination;
+                       _destinationType = destinationType;
+               }
+
+               #endregion
+
+               #region NullRequestContextCollection
+
+               //public NmsAsyncRequestContextCollection PendingRequests
+               //{
+               //    get { return _pendingRequests; }
+               //}
+
+               #endregion
+
+               #region Public properties
+
+               /// <summary>
+               /// Gets the remote address.
+               /// </summary>
+               /// <value>The remote address.</value>
+               public EndpointAddress RemoteAddress
+               {
+                       get { return _remoteAddress; }
+               }
+
+               /// <summary>
+               /// Gets the routing address.
+               /// </summary>
+               /// <value>The routing address.</value>
+               public Uri Via
+               {
+                       get { return _via; }
+               }
+
+               /// <summary>
+               /// Gets the buffer manager.
+               /// </summary>
+               /// <value>The buffer manager.</value>
+               public BufferManager BufferManager
+               {
+                       get { return _bufferManager; }
+               }
+
+               /// <summary>
+               /// Gets the encoder.
+               /// </summary>
+               /// <value>The encoder.</value>
+               public MessageEncoder Encoder
+               {
+                       get { return _encoder.Encoder; }
+               }
+
+               /// <summary>
+               /// Gets the name of the destination (either a queue or a 
topic).
+               /// </summary>
+               /// <value>The name of the destination.</value>
+               public string Destination
+               {
+                       get { return _destination; }
+               }
+
+               /// <summary>
+               /// Gets the type of the destination.
+               /// </summary>
+               /// <value>The type of the destination.</value>
+               public DestinationType DestinationType
+               {
+                       get { return _destinationType; }
+               }
+
+               #endregion
+
+               #region Abort
+
+               /// <summary>
+               /// Inserts processing on a communication object after it 
transitions to the closing state due to the invocation of a synchronous abort 
operation.
+               /// </summary>
+               protected override void OnAbort()
+               {
+                       //_pendingRequests.AbortAll();
+               }
+
+               #endregion
+
+               #region Close
+
+               /// <summary>
+               /// Inserts processing after a communication object transitions 
to the closing state due to the invocation of an asynchronous close operation.
+               /// </summary>
+               /// <param name="timeout">The <see cref="T:System.Timespan"/> 
that specifies how long the on close operation has to complete before timing 
out.</param>
+               /// <param name="callback">The <see 
cref="T:System.AsyncCallback"/> delegate that receives notification of the 
completion of the asynchronous on close operation.</param>
+               /// <param name="state">An object, specified by the 
application, that contains state information associated with the asynchronous 
on close operation.</param>
+               /// <returns>
+               /// The <see cref="T:System.IAsyncResult"/> that references the 
asynchronous on close operation.
+               /// </returns>
+               /// <exception cref="T:System.ArgumentOutOfRangeException">
+               ///     <paramref name="timeout"/> is less than 
zero.</exception>
+               protected override IAsyncResult OnBeginClose(TimeSpan timeout, 
AsyncCallback callback, object state)
+               {
+                       OnClose(timeout);
+                       return new CompletedAsyncResult(callback, state);
+               }
+
+               /// <summary>
+               /// Inserts processing on a communication object after it 
transitions to the closing state due to the invocation of a synchronous close 
operation.
+               /// </summary>
+               /// <param name="timeout">The <see cref="T:System.Timespan"/> 
that specifies how long the on close operation has to complete before timing 
out.</param>
+               /// <exception cref="T:System.ArgumentOutOfRangeException">
+               ///     <paramref name="timeout"/> is less than 
zero.</exception>
+               protected override void OnClose(TimeSpan timeout)
+               {
+                       //_pendingRequests.AbortAll();
+               }
+
+               /// <summary>
+               /// Completes an asynchronous operation on the close of a 
communication object.
+               /// </summary>
+               /// <param name="result">The <see 
cref="T:System.IAsyncResult"/> that is returned by a call to the <see 
cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndClose(System.IAsyncResult)"/>
 method.</param>
+               /// <exception cref="T:System.TimeoutException">The interval of 
time specified by <paramref name="timeout"/> that was allotted for the 
operation was exceeded before the operation was completed.</exception>
+               protected override void OnEndClose(IAsyncResult result)
+               {
+                       CompletedAsyncResult.End(result);
+               }
+
+               #endregion
+
+               #region Open
+
+               /// <summary>
+               /// Inserts processing on a communication object after it 
transitions to the opening state due to the invocation of an asynchronous open 
operation.
+               /// </summary>
+               /// <param name="timeout">The <see cref="T:System.Timespan"/> 
that specifies how long the on open operation has to complete before timing 
out.</param>
+               /// <param name="callback">The <see 
cref="T:System.AsyncCallback"/> delegate that receives notification of the 
completion of the asynchronous on open operation.</param>
+               /// <param name="state">An object, specified by the 
application, that contains state information associated with the asynchronous 
on open operation.</param>
+               /// <returns>
+               /// The <see cref="T:System.IAsyncResult"/> that references the 
asynchronous on open operation.
+               /// </returns>
+               /// <exception cref="T:System.ArgumentOutOfRangeException">
+               ///     <paramref name="timeout"/> is less than 
zero.</exception>
+               protected override IAsyncResult OnBeginOpen(TimeSpan timeout, 
AsyncCallback callback, object state)
+               {
+                       OnOpen(timeout);
+                       return new CompletedAsyncResult(callback, state);
+               }
+
+               /// <summary>
+               /// Inserts processing on a communication object after it 
transitions into the opening state which must complete within a specified 
interval of time.
+               /// </summary>
+               /// <param name="timeout">The <see cref="T:System.Timespan"/> 
that specifies how long the on open operation has to complete before timing 
out.</param>
+               /// <exception cref="T:System.ArgumentOutOfRangeException">
+               ///     <paramref name="timeout"/> is less than 
zero.</exception>
+               /// <exception cref="T:System.TimeoutException">The interval of 
time specified by <paramref name="timeout"/> that was allotted for the 
operation was exceeded before the operation was completed.</exception>
+               protected override void OnOpen(TimeSpan timeout)
+               {
+               }
+
+               /// <summary>
+               /// Completes an asynchronous operation on the open of a 
communication object.
+               /// </summary>
+               /// <param name="result">The <see 
cref="T:System.IAsyncResult"/> that is returned by a call to the <see 
cref="M:System.ServiceModel.Channels.CommunicationObject.OnEndOpen(System.IAsyncResult)"/>
 method.</param>
+               /// <exception cref="T:System.TimeoutException">The interval of 
time specified by <paramref name="timeout"/> that was allotted for the 
operation was exceeded before the operation was completed.</exception>
+               protected override void OnEndOpen(IAsyncResult result)
+               {
+                       CompletedAsyncResult.End(result);
+               }
+
+               #endregion
+
+               #region GetProperty
+
+               /// <summary>
+               /// Gets the property.
+               /// </summary>
+               /// <typeparam name="T"></typeparam>
+               /// <returns></returns>
+               public override T GetProperty<T>()
+               {
+                       if(typeof(T) == typeof(FaultConverter))
+                       {
+                               return 
FaultConverter.GetDefaultFaultConverter(MessageVersion.Soap12WSAddressing10) as 
T;
+                       }
+                       return base.GetProperty<T>();
+               }
+
+               #endregion
+
+               #region Private members
+
+               private EndpointAddress _remoteAddress;
+               private Uri _via;
+               private BufferManager _bufferManager;
+               private MessageEncoderFactory _encoder;
+               private string _destination;
+               private DestinationType _destinationType;
+
+               // for request/reply pattern
+               //NullAsyncRequestContextCollection _pendingRequests = new 
NullAsyncRequestContextCollection();
+
+               #endregion
+       }
+}

Added: 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs?rev=726083&view=auto
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs
 (added)
+++ 
activemq/activemq-dotnet/Apache.NMS.WCF/trunk/src/main/csharp/NmsOutputSessionChannel.cs
 Fri Dec 12 10:25:52 2008
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.ServiceModel;
+using System.ServiceModel.Channels;
+
+namespace Apache.NMS.WCF
+{
+       /// <summary>
+       /// Client-side implementation of the sessioned one-way channel.
+       /// </summary>
+       internal class NmsOutputSessionChannel : NmsOutputChannel, 
IOutputSessionChannel
+       {
+               #region Constructors
+
+               /// <summary>
+               /// Initializes a new instance of the <see 
cref="NmsOutputSessionChannel"/> class.
+               /// </summary>
+               /// <param name="factory">The factory that created this 
channel.</param>
+               /// <param name="address">The address of this channel.</param>
+               /// <param name="via">The URI that contains the transport 
address to which messages are sent on the output channel.</param>
+               /// <param name="bufferManager">The buffer manager.</param>
+               /// <param name="encoderFactory">The encoder factory.</param>
+               /// <param name="destination">The name of the ActiveMQ 
destination.</param>
+               /// <param name="destinationType">The type of the ActiveMQ 
destination (either a queue or a topic, permanent or temporary).</param>
+               public NmsOutputSessionChannel(ChannelManagerBase factory, Uri 
via, EndpointAddress address, BufferManager bufferManager, 
MessageEncoderFactory encoderFactory, string destination, DestinationType 
destinationType)
+                       : base(factory, address, via, bufferManager, 
encoderFactory, destination, destinationType)
+               {
+               }
+
+               #endregion
+
+               #region ISessionChannel<IOutputSession> Members
+
+               /// <summary>
+               /// Gets the type of session associated with this channel.
+               /// </summary>
+               /// <value></value>
+               /// <returns>The type of <see 
cref="T:System.ServiceModel.Channels.ISession"/> associated with this channel. 
</returns>
+               public IOutputSession Session
+               {
+                       get { return _session; }
+               }
+
+               /// <summary>
+               /// Internal implementation of a session, with tracking ID.
+               /// </summary>
+               private class OutputSession : IOutputSession, 
System.ServiceModel.Channels.ISession
+               {
+                       private string _sessionId = 
NmsChannelHelper.CreateUniqueSessionId();
+
+                       /// <summary>
+                       /// Gets the ID that uniquely identifies the session.
+                       /// </summary>
+                       /// <value></value>
+                       /// <returns>The ID that uniquely identifies the 
session. </returns>
+                       public string Id
+                       {
+                               get { return _sessionId; }
+                       }
+               }
+
+               #endregion
+
+               #region Private members
+
+               private IOutputSession _session = new OutputSession();
+
+               #endregion
+       }
+}

Modified: activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj?rev=726083&r1=726082&r2=726083&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.WCF/trunk/vs2008-nms-wcf.csproj Fri Dec 
12 10:25:52 2008
@@ -66,12 +66,13 @@
     </NoWarn>
   </PropertyGroup>
   <ItemGroup>
-    <Reference Include="Apache.NMS, Version=1.1.0.0, Culture=neutral, 
processorArchitecture=MSIL">
+    <Reference Include="Apache.NMS, Version=1.1.0.0, Culture=neutral, 
PublicKeyToken=2a329723af30bc8d, processorArchitecture=MSIL">
       <SpecificVersion>False</SpecificVersion>
       <HintPath>lib\Apache.NMS\net-3.5\Apache.NMS.dll</HintPath>
     </Reference>
     <Reference Include="System" />
     <Reference Include="System.configuration" />
+    <Reference Include="System.Data" />
     <Reference Include="System.Runtime.Serialization">
       <RequiredTargetFramework>3.0</RequiredTargetFramework>
     </Reference>
@@ -121,12 +122,16 @@
     <Compile Include="src\main\csharp\ConnectionFactoryManager.cs" />
     <Compile Include="src\main\csharp\InputQueue.cs" />
     <Compile Include="src\main\csharp\NmsAsyncResult.cs" />
-    <Compile Include="src\main\csharp\NmsChannelBase.cs" />
+    <Compile Include="src\main\csharp\NmsInputQueueChannelBase.cs" />
     <Compile Include="src\main\csharp\NmsChannelFactory.cs" />
     <Compile Include="src\main\csharp\NmsChannelHelper.cs" />
-    <Compile Include="src\main\csharp\NmsChannelListener.cs" />
+    <Compile Include="src\main\csharp\NmsInputChannelListener.cs" />
     <Compile Include="src\main\csharp\NmsInputChannel.cs" />
+    <Compile Include="src\main\csharp\NmsInputSessionChannel.cs" />
+    <Compile Include="src\main\csharp\NmsInputSessionChannelListener.cs" />
     <Compile Include="src\main\csharp\NmsOutputChannel.cs" />
+    <Compile Include="src\main\csharp\NmsOutputChannelBase.cs" />
+    <Compile Include="src\main\csharp\NmsOutputSessionChannel.cs" />
   </ItemGroup>
   <Import Project="$(MSBuildBinPath)\Microsoft.CSHARP.Targets" />
   <PropertyGroup>
@@ -135,4 +140,4 @@
     <PostBuildEvent>cd $(ProjectDir)
 nant -nologo -q install-all -D:compile.skip=true</PostBuildEvent>
   </PropertyGroup>
-</Project>
\ No newline at end of file
+</Project>


Reply via email to