Author: shuston
Date: Wed Dec 16 22:52:47 2009
New Revision: 891464
URL: http://svn.apache.org/viewvc?rev=891464&view=rev
Log:
Apply patches for QPID-2247.
Modified:
qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs?rev=891464&r1=891463&r2=891464&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs (original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelFactory.cs Wed Dec
16 22:52:47 2009
@@ -32,7 +32,8 @@
AmqpChannelProperties channelProperties;
long maxBufferPoolSize;
bool shared;
- int prefetchLimit;
+ int prefetchLimit;
+ List<AmqpTransportChannel> openChannels;
internal AmqpChannelFactory(AmqpTransportBindingElement
bindingElement, BindingContext context)
: base(context.Binding)
@@ -45,7 +46,7 @@
Collection<MessageEncodingBindingElement>
messageEncoderBindingElements
=
context.BindingParameters.FindAll<MessageEncodingBindingElement>();
- if(messageEncoderBindingElements.Count > 1)
+ if (messageEncoderBindingElements.Count > 1)
{
throw new InvalidOperationException("More than one
MessageEncodingBindingElement was found in the BindingParameters of the
BindingContext");
}
@@ -57,6 +58,8 @@
{
this.messageEncoderFactory = new
TextMessageEncodingBindingElement().CreateMessageEncoderFactory();
}
+
+ openChannels = new List<AmqpTransportChannel>();
}
@@ -93,8 +96,42 @@
protected override TChannel OnCreateChannel(EndpointAddress
remoteAddress, Uri via)
{
- return (TChannel)(object) new AmqpTransportChannel(this,
this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder,
this.maxBufferPoolSize, this.shared, this.prefetchLimit);
+ AmqpTransportChannel channel = new AmqpTransportChannel(this,
this.channelProperties, remoteAddress, this.messageEncoderFactory.Encoder,
this.maxBufferPoolSize, this.shared, this.prefetchLimit);
+ lock (openChannels)
+ {
+ channel.Closed += new EventHandler(channel_Closed);
+ openChannels.Add(channel);
+ }
+ return (TChannel)(object) channel;
}
+ void channel_Closed(object sender, EventArgs e)
+ {
+ if (this.State != CommunicationState.Opened)
+ {
+ return;
+ }
+
+ lock (openChannels)
+ {
+ AmqpTransportChannel channel = (AmqpTransportChannel)sender;
+ if (openChannels.Contains(channel))
+ {
+ openChannels.Remove(channel);
+ }
+ }
+ }
+
+ protected override void OnClose(TimeSpan timeout)
+ {
+ base.OnClose(timeout);
+ lock (openChannels)
+ {
+ foreach (AmqpTransportChannel channel in openChannels)
+ {
+ channel.Close(timeout);
+ }
+ }
+ }
}
}
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs?rev=891464&r1=891463&r2=891464&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs
(original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpChannelListener.cs Wed Dec
16 22:52:47 2009
@@ -130,15 +130,21 @@
protected override IInputChannel OnAcceptChannel(TimeSpan timeout)
{
+ if (this.IsDisposed)
+ {
+ return null;
+ }
+
if (amqpTransportChannel == null)
{
+ // TODO: add timeout processing
amqpTransportChannel = new AmqpTransportChannel(this,
this.channelProperties,
new EndpointAddress(uri),
messageEncoderFactory.Encoder,
maxBufferPoolSize, this.shared, this.prefetchLimit);
- return (IInputChannel)(object) amqpTransportChannel;
+ return (IInputChannel)(object)amqpTransportChannel;
}
- // TODO: remove "max one channel" restriction, add timeout
processing
+ // Singleton channel. Subsequent Accepts wait until the listener
is closed
acceptWaitEvent.WaitOne();
return null;
}
@@ -155,7 +161,11 @@
protected override void OnClose(TimeSpan timeout)
{
- // TODO: (+ OnAbort)
+ if (amqpTransportChannel != null)
+ {
+ amqpTransportChannel.Close();
+ }
+ acceptWaitEvent.Set();
}
protected override IAsyncResult OnBeginClose(TimeSpan timeout,
AsyncCallback callback, object state)
@@ -170,7 +180,9 @@
protected override void OnAbort()
{
- // TODO:
+ if (amqpTransportChannel != null)
+ amqpTransportChannel.Abort();
+ acceptWaitEvent.Set();
}
}
}
Modified: qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs?rev=891464&r1=891463&r2=891464&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs
(original)
+++ qpid/trunk/qpid/wcf/src/Apache/Qpid/Channel/AmqpTransportChannel.cs Wed Dec
16 22:52:47 2009
@@ -278,7 +278,6 @@
public bool TryReceive(TimeSpan timeout, out Message message)
{
- this.ThrowIfDisposedOrNotOpen();
AmqpMessage amqpMessage;
message = null;
@@ -379,6 +378,7 @@
protected override void OnAbort()
{
//// TODO: check for network-less qpid teardown or launch special
thread
+ this.CloseEndPoint();
this.Cleanup();
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]