Author: shuston
Date: Wed Dec 16 22:52:47 2009
New Revision: 891464

URL: http://svn.apache.org/viewvc?rev=891464&view=rev
Log:
Apply patches for QPID-2247.

Modified:
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
    qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs?rev=891464&r1=891463&r2=891464&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs Wed Dec 
16 22:52:47 2009
@@ -32,7 +32,8 @@
         AmqpChannelProperties channelProperties;
         long maxBufferPoolSize;
         bool shared;
-       int prefetchLimit;
+           int prefetchLimit;
+        List<AmqpTransportChannel> openChannels;
 
         internal AmqpChannelFactory(AmqpTransportBindingElement 
bindingElement, BindingContext context)
             : base(context.Binding)
@@ -45,7 +46,7 @@
             Collection<MessageEncodingBindingElement> 
messageEncoderBindingElements
                 = 
context.BindingParameters.FindAll<MessageEncodingBindingElement>();
 
-            if(messageEncoderBindingElements.Count > 1)
+            if (messageEncoderBindingElements.Count > 1)
             {
                 throw new InvalidOperationException("More than one 
MessageEncodingBindingElement was found in the BindingParameters of the 
BindingContext");
             }
@@ -57,6 +58,8 @@
             {
                 this.messageEncoderFactory = new 
TextMessageEncodingBindingElement().CreateMessageEncoderFactory();
             }
+
+            openChannels = new List<AmqpTransportChannel>();
         }
 
 
@@ -93,8 +96,42 @@
 
         protected override TChannel OnCreateChannel(EndpointAddress 
remoteAddress, Uri via)
         {
-            return (TChannel)(object) new AmqpTransportChannel(this, 
this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, 
this.maxBufferPoolSize, this.shared, this.prefetchLimit);
+            AmqpTransportChannel channel = new AmqpTransportChannel(this, 
this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder, 
this.maxBufferPoolSize, this.shared, this.prefetchLimit);
+            lock (openChannels)
+            {
+                channel.Closed += new EventHandler(channel_Closed);
+                openChannels.Add(channel);
+            }
+            return (TChannel)(object) channel;
         }
 
+        void channel_Closed(object sender, EventArgs e)
+        {
+            if (this.State != CommunicationState.Opened)
+            {
+                return;
+            }
+
+            lock (openChannels)
+            {
+                AmqpTransportChannel channel = (AmqpTransportChannel)sender;
+                if (openChannels.Contains(channel))
+                {
+                    openChannels.Remove(channel);
+                }
+            }
+        }
+
+        protected override void OnClose(TimeSpan timeout)
+        {
+            base.OnClose(timeout);
+            lock (openChannels)
+            {
+                foreach (AmqpTransportChannel channel in openChannels)
+                {
+                    channel.Close(timeout);
+                }
+            }
+        }
     }
 }

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs?rev=891464&r1=891463&r2=891464&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs 
(original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs Wed Dec 
16 22:52:47 2009
@@ -130,15 +130,21 @@
 
         protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
         {
+            if (this.IsDisposed)
+            {
+                return null;
+            }
+
             if (amqpTransportChannel == null)
             {
+                // TODO: add timeout processing
                 amqpTransportChannel = new AmqpTransportChannel(this, 
this.channelProperties,
                         new EndpointAddress(uri), 
messageEncoderFactory.Encoder,
                         maxBufferPoolSize, this.shared, this.prefetchLimit);
-                return (IInputChannel)(object) amqpTransportChannel;
+                return (IInputChannel)(object)amqpTransportChannel;
             }
 
-            // TODO: remove "max one channel" restriction,  add timeout 
processing
+            // Singleton channel.  Subsequent Accepts wait until the listener 
is closed
             acceptWaitEvent.WaitOne();
             return null;
         }
@@ -155,7 +161,11 @@
 
         protected override void OnClose(TimeSpan timeout)
         {
-            // TODO: (+ OnAbort)
+            if (amqpTransportChannel != null)
+            {
+                amqpTransportChannel.Close();
+            }
+            acceptWaitEvent.Set();
         }
 
         protected override IAsyncResult OnBeginClose(TimeSpan timeout, 
AsyncCallback callback, object state)
@@ -170,7 +180,9 @@
 
         protected override void OnAbort()
         {
-            // TODO:
+            if (amqpTransportChannel != null)
+                amqpTransportChannel.Abort();
+            acceptWaitEvent.Set();
         }
     }
 }

Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs?rev=891464&r1=891463&r2=891464&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs 
(original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs Wed Dec 
16 22:52:47 2009
@@ -278,7 +278,6 @@
 
         public bool TryReceive(TimeSpan timeout, out Message message)
         {
-            this.ThrowIfDisposedOrNotOpen();
             AmqpMessage amqpMessage;
             message = null;
 
@@ -379,6 +378,7 @@
         protected override void OnAbort()
         {
             //// TODO: check for network-less qpid teardown or launch special 
thread
+            this.CloseEndPoint();
             this.Cleanup();
         }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to