Author: tabish
Date: Wed Aug 11 21:35:26 2010
New Revision: 984591
URL: http://svn.apache.org/viewvc?rev=984591&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQNET-270
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
(with props)
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
(with props)
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=984591&r1=984590&r2=984591&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
Wed Aug 11 21:35:26 2010
@@ -44,6 +44,7 @@ namespace Apache.NMS.ActiveMQ
private bool sendAcksAsync = false;
private bool dispatchAsync = true;
private int producerWindowSize = 0;
+ private bool messagePrioritySupported=true;
private bool userSpecifiedClientID;
private readonly Uri brokerUri;
@@ -238,6 +239,17 @@ namespace Apache.NMS.ActiveMQ
set { this.useCompression = value; }
}
+ /// <summary>
+ /// Indicate whether or not the resources of this Connection should
support the
+ /// Message Priority value of incoming messages and dispatch them
accordingly.
+ /// When disabled Message are always dispatched to Consumers in FIFO
order.
+ /// </summary>
+ public bool MessagePrioritySupported
+ {
+ get { return this.messagePrioritySupported; }
+ set { this.messagePrioritySupported = value; }
+ }
+
public IConnectionMetaData MetaData
{
get { return this.metaData ?? (this.metaData = new
ConnectionMetaData()); }
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs?rev=984591&r1=984590&r2=984591&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs
Wed Aug 11 21:35:26 2010
@@ -53,6 +53,7 @@ namespace Apache.NMS.ActiveMQ
private int producerWindowSize = 0;
private AcknowledgementMode acknowledgementMode =
AcknowledgementMode.AutoAcknowledge;
private TimeSpan requestTimeout =
NMSConstants.defaultRequestTimeout;
+ private bool messagePrioritySupported=true;
private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
private PrefetchPolicy prefetchPolicy = new PrefetchPolicy();
@@ -255,7 +256,13 @@ namespace Apache.NMS.ActiveMQ
get { return this.dispatchAsync; }
set { this.dispatchAsync = value; }
}
-
+
+ public bool MessagePrioritySupported
+ {
+ get { return this.messagePrioritySupported; }
+ set { this.messagePrioritySupported = value; }
+ }
+
public int RequestTimeout
{
get { return
(int)this.requestTimeout.TotalMilliseconds; }
@@ -359,6 +366,7 @@ namespace Apache.NMS.ActiveMQ
connection.UseCompression = this.useCompression;
connection.RequestTimeout = this.requestTimeout;
connection.ProducerWindowSize = this.producerWindowSize;
+ connection.MessagePrioritySupported =
this.messagePrioritySupported;
connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as
IRedeliveryPolicy;
connection.PrefetchPolicy = this.prefetchPolicy.Clone() as
PrefetchPolicy;
connection.CompressionPolicy = this.compressionPolicy.Clone() as
ICompressionPolicy;
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=984591&r1=984590&r2=984591&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Wed Aug 11 21:35:26 2010
@@ -39,7 +39,7 @@ namespace Apache.NMS.ActiveMQ
/// </summary>
public class MessageConsumer : IMessageConsumer, IDispatcher
{
- private readonly MessageDispatchChannel unconsumedMessages =
new MessageDispatchChannel();
+ private readonly MessageDispatchChannel unconsumedMessages;
private readonly LinkedList<MessageDispatch> dispatchedMessages
= new LinkedList<MessageDispatch>();
private readonly ConsumerInfo info;
private Session session;
@@ -79,7 +79,16 @@ namespace Apache.NMS.ActiveMQ
this.session = session;
this.redeliveryPolicy = this.session.Connection.RedeliveryPolicy;
-
+
+ if(session.Connection.MessagePrioritySupported)
+ {
+ this.unconsumedMessages = new
SimplePriorityMessageDispatchChannel();
+ }
+ else
+ {
+ this.unconsumedMessages = new FifoMessageDispatchChannel();
+ }
+
this.info = new ConsumerInfo();
this.info.ConsumerId = id;
this.info.Destination = destination;
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs?rev=984591&r1=984590&r2=984591&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/SessionExecutor.cs
Wed Aug 11 21:35:26 2010
@@ -25,7 +25,7 @@ namespace Apache.NMS.ActiveMQ
{
public class SessionExecutor : Threads.Task
{
- private MessageDispatchChannel messageQueue = new
MessageDispatchChannel();
+ private MessageDispatchChannel messageQueue = null;
private TaskRunner taskRunner = null;
private Session session = null;
@@ -35,6 +35,15 @@ namespace Apache.NMS.ActiveMQ
{
this.session = session;
this.consumers = consumers;
+
+ if(this.session.Connection != null &&
this.session.Connection.MessagePrioritySupported)
+ {
+ this.messageQueue = new SimplePriorityMessageDispatchChannel();
+ }
+ else
+ {
+ this.messageQueue = new FifoMessageDispatchChannel();
+ }
}
~SessionExecutor()
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs?rev=984591&view=auto
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
(added)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
Wed Aug 11 21:35:26 2010
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ
+{
+ /// <summary>
+ /// A FIFO based MessageDispatchChannel.
+ /// </summary>
+ public class FifoMessageDispatchChannel : MessageDispatchChannel
+ {
+ private readonly Mutex mutex = new Mutex();
+ private bool closed;
+ private bool running;
+ private LinkedList<MessageDispatch> channel = new
LinkedList<MessageDispatch>();
+
+ #region Properties
+
+ public object SyncRoot
+ {
+ get{ return this.mutex; }
+ }
+
+ public bool Closed
+ {
+ get
+ {
+ lock(this.mutex)
+ {
+ return this.closed;
+ }
+ }
+
+ set
+ {
+ lock(this.mutex)
+ {
+ this.closed = value;
+ }
+ }
+ }
+
+ public bool Running
+ {
+ get
+ {
+ lock(this.mutex)
+ {
+ return this.running;
+ }
+ }
+
+ set
+ {
+ lock(this.mutex)
+ {
+ this.running = value;
+ }
+ }
+ }
+
+ public bool Empty
+ {
+ get
+ {
+ lock(mutex)
+ {
+ return channel.Count == 0;
+ }
+ }
+ }
+
+ public long Count
+ {
+ get
+ {
+ lock(mutex)
+ {
+ return channel.Count;
+ }
+ }
+ }
+
+ #endregion
+
+ public void Start()
+ {
+ lock(this.mutex)
+ {
+ if(!Closed)
+ {
+ this.running = true;
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+ }
+
+ public void Stop()
+ {
+ lock(mutex)
+ {
+ this.running = false;
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+
+ public void Close()
+ {
+ lock(mutex)
+ {
+ if(!Closed)
+ {
+ this.running = false;
+ this.closed = true;
+ }
+
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+
+ public void Enqueue(MessageDispatch dispatch)
+ {
+ lock(this.mutex)
+ {
+ this.channel.AddLast(dispatch);
+ Monitor.Pulse(this.mutex);
+ }
+ }
+
+ public void EnqueueFirst(MessageDispatch dispatch)
+ {
+ lock(this.mutex)
+ {
+ this.channel.AddFirst(dispatch);
+ Monitor.Pulse(this.mutex);
+ }
+ }
+
+ public MessageDispatch Dequeue(TimeSpan timeout)
+ {
+ lock(this.mutex)
+ {
+ // Wait until the channel is ready to deliver messages.
+ if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running
) )
+ {
+ Monitor.Wait(this.mutex, timeout);
+ }
+
+ if( Closed || !Running || Empty )
+ {
+ return null;
+ }
+
+ return DequeueNoWait();
+ }
+ }
+
+ public MessageDispatch DequeueNoWait()
+ {
+ MessageDispatch result = null;
+
+ lock(this.mutex)
+ {
+ if( Closed || !Running || Empty )
+ {
+ return null;
+ }
+
+ result = channel.First.Value;
+ this.channel.RemoveFirst();
+ }
+
+ return result;
+ }
+
+ public MessageDispatch Peek()
+ {
+ lock(this.mutex)
+ {
+ if( Closed || !Running || Empty )
+ {
+ return null;
+ }
+
+ return channel.First.Value;
+ }
+ }
+
+ public void Clear()
+ {
+ lock(mutex)
+ {
+ this.channel.Clear();
+ }
+ }
+
+ public MessageDispatch[] RemoveAll()
+ {
+ MessageDispatch[] result;
+
+ lock(mutex)
+ {
+ result = new MessageDispatch[this.Count];
+ channel.CopyTo(result, 0);
+ channel.Clear();
+ }
+
+ return result;
+ }
+ }
+}
+
Propchange:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/FifoMessageDispatchChannel.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs?rev=984591&r1=984590&r2=984591&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/MessageDispatchChannel.cs
Wed Aug 11 21:35:26 2010
@@ -22,205 +22,60 @@ using Apache.NMS.ActiveMQ.Commands;
namespace Apache.NMS.ActiveMQ
{
- public class MessageDispatchChannel
+ /// <summary>
+ /// Defines an interface for a Message Channel used to dispatch incoming
+ /// Messages to a Session or MessageConsumer. The implementation controls
+ /// how the messages are dequeued from the channel, one option is for a
+ /// FIFO ordering while another might be to sort the Message's based on the
+ /// set Message Priority.
+ /// </summary>
+ public interface MessageDispatchChannel
{
- private readonly Mutex mutex = new Mutex();
- private bool closed;
- private bool running;
- private LinkedList<MessageDispatch> channel = new
LinkedList<MessageDispatch>();
-
- #region Properties
-
- public object SyncRoot
+ object SyncRoot
{
- get{ return this.mutex; }
+ get;
}
- public bool Closed
- {
- get
- {
- lock(this.mutex)
- {
- return this.closed;
- }
- }
-
- set
- {
- lock(this.mutex)
- {
- this.closed = value;
- }
- }
- }
-
- public bool Running
- {
- get
- {
- lock(this.mutex)
- {
- return this.running;
- }
- }
-
- set
- {
- lock(this.mutex)
- {
- this.running = value;
- }
- }
- }
-
- public bool Empty
+ bool Closed
{
- get
- {
- lock(mutex)
- {
- return channel.Count == 0;
- }
- }
+ get;
+ set;
}
- public long Count
+ bool Running
{
- get
- {
- lock(mutex)
- {
- return channel.Count;
- }
- }
+ get;
+ set;
}
- #endregion
-
- public void Start()
+ bool Empty
{
- lock(this.mutex)
- {
- if(!Closed)
- {
- this.running = true;
- Monitor.PulseAll(this.mutex);
- }
- }
+ get;
}
- public void Stop()
+ long Count
{
- lock(mutex)
- {
- this.running = false;
- Monitor.PulseAll(this.mutex);
- }
+ get;
}
- public void Close()
- {
- lock(mutex)
- {
- if(!Closed)
- {
- this.running = false;
- this.closed = true;
- }
-
- Monitor.PulseAll(this.mutex);
- }
- }
-
- public void Enqueue(MessageDispatch dispatch)
- {
- lock(this.mutex)
- {
- this.channel.AddLast(dispatch);
- Monitor.Pulse(this.mutex);
- }
- }
+ void Start();
- public void EnqueueFirst(MessageDispatch dispatch)
- {
- lock(this.mutex)
- {
- this.channel.AddFirst(dispatch);
- Monitor.Pulse(this.mutex);
- }
- }
+ void Stop();
- public MessageDispatch Dequeue(TimeSpan timeout)
- {
- lock(this.mutex)
- {
- // Wait until the channel is ready to deliver messages.
- if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running
) )
- {
- Monitor.Wait(this.mutex, timeout);
- }
+ void Close();
- if( Closed || !Running || Empty )
- {
- return null;
- }
-
- return DequeueNoWait();
- }
- }
+ void Enqueue(MessageDispatch dispatch);
- public MessageDispatch DequeueNoWait()
- {
- MessageDispatch result = null;
-
- lock(this.mutex)
- {
- if( Closed || !Running || Empty )
- {
- return null;
- }
-
- result = channel.First.Value;
- this.channel.RemoveFirst();
- }
+ void EnqueueFirst(MessageDispatch dispatch);
- return result;
- }
+ MessageDispatch Dequeue(TimeSpan timeout);
- public MessageDispatch Peek()
- {
- lock(this.mutex)
- {
- if( Closed || !Running || Empty )
- {
- return null;
- }
-
- return channel.First.Value;
- }
- }
+ MessageDispatch DequeueNoWait();
- public void Clear()
- {
- lock(mutex)
- {
- this.channel.Clear();
- }
- }
+ MessageDispatch Peek();
- public MessageDispatch[] RemoveAll()
- {
- MessageDispatch[] result;
-
- lock(mutex)
- {
- result = new MessageDispatch[this.Count];
- channel.CopyTo(result, 0);
- channel.Clear();
- }
+ void Clear();
- return result;
- }
+ MessageDispatch[] RemoveAll();
}
}
Added:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs?rev=984591&view=auto
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
(added)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
Wed Aug 11 21:35:26 2010
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Apache.NMS.ActiveMQ.Commands;
+
+namespace Apache.NMS.ActiveMQ
+{
+ public class SimplePriorityMessageDispatchChannel : MessageDispatchChannel
+ {
+ public const int MAX_PRIORITY = 10;
+
+ private readonly Mutex mutex = new Mutex();
+ private bool closed;
+ private bool running;
+ private LinkedList<MessageDispatch>[] channels = new
LinkedList<MessageDispatch>[MAX_PRIORITY];
+ private int size;
+
+ public SimplePriorityMessageDispatchChannel()
+ {
+ for(int i = 0; i < MAX_PRIORITY; ++i)
+ {
+ channels[i] = new LinkedList<MessageDispatch>();
+ }
+ }
+
+ #region Properties
+
+ public object SyncRoot
+ {
+ get{ return this.mutex; }
+ }
+
+ public bool Closed
+ {
+ get
+ {
+ lock(this.mutex)
+ {
+ return this.closed;
+ }
+ }
+
+ set
+ {
+ lock(this.mutex)
+ {
+ this.closed = value;
+ }
+ }
+ }
+
+ public bool Running
+ {
+ get
+ {
+ lock(this.mutex)
+ {
+ return this.running;
+ }
+ }
+
+ set
+ {
+ lock(this.mutex)
+ {
+ this.running = value;
+ }
+ }
+ }
+
+ public bool Empty
+ {
+ get
+ {
+ lock(mutex)
+ {
+ return this.size == 0;
+ }
+ }
+ }
+
+ public long Count
+ {
+ get
+ {
+ lock(mutex)
+ {
+ return this.size;
+ }
+ }
+ }
+
+ #endregion
+
+ public void Start()
+ {
+ lock(this.mutex)
+ {
+ if(!Closed)
+ {
+ this.running = true;
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+ }
+
+ public void Stop()
+ {
+ lock(mutex)
+ {
+ this.running = false;
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+
+ public void Close()
+ {
+ lock(mutex)
+ {
+ if(!Closed)
+ {
+ this.running = false;
+ this.closed = true;
+ }
+
+ Monitor.PulseAll(this.mutex);
+ }
+ }
+
+ public void Enqueue(MessageDispatch dispatch)
+ {
+ lock(this.mutex)
+ {
+ GetList(dispatch).AddLast(dispatch);
+ this.size++;
+ Monitor.Pulse(this.mutex);
+ }
+ }
+
+ public void EnqueueFirst(MessageDispatch dispatch)
+ {
+ lock(this.mutex)
+ {
+ GetList(dispatch).AddFirst(dispatch);
+ this.size++;
+ Monitor.Pulse(this.mutex);
+ }
+ }
+
+ public MessageDispatch Dequeue(TimeSpan timeout)
+ {
+ lock(this.mutex)
+ {
+ // Wait until the channel is ready to deliver messages.
+ if( timeout != TimeSpan.Zero && !Closed && ( Empty || !Running
) )
+ {
+ Monitor.Wait(this.mutex, timeout);
+ }
+
+ if( Closed || !Running || Empty )
+ {
+ return null;
+ }
+
+ return RemoveFirst();
+ }
+ }
+
+ public MessageDispatch DequeueNoWait()
+ {
+ MessageDispatch result = null;
+
+ lock(this.mutex)
+ {
+ if( Closed || !Running || Empty )
+ {
+ return null;
+ }
+
+ result = RemoveFirst();
+ }
+
+ return result;
+ }
+
+ public MessageDispatch Peek()
+ {
+ lock(this.mutex)
+ {
+ if( Closed || !Running || Empty )
+ {
+ return null;
+ }
+
+ return GetFirst();
+ }
+ }
+
+ public void Clear()
+ {
+ lock(mutex)
+ {
+ foreach(LinkedList<MessageDispatch> list in channels)
+ {
+ list.Clear();
+ }
+ }
+ }
+
+ public MessageDispatch[] RemoveAll()
+ {
+ MessageDispatch[] result;
+
+ lock(mutex)
+ {
+ result = new MessageDispatch[this.size];
+ int copyPos = 0;
+
+ for(int i = MAX_PRIORITY - 1; i >= 0; i--)
+ {
+ LinkedList<MessageDispatch> list = channels[i];
+ list.CopyTo(result, copyPos);
+ copyPos += list.Count;
+ size -= list.Count;
+ list.Clear();
+ }
+ }
+
+ return result;
+ }
+
+ protected int getPriority(MessageDispatch message)
+ {
+ int priority = (int) NMSConstants.defaultPriority;
+
+ if(message.Message != null)
+ {
+ priority = Math.Max((int) message.Message.Priority, 0);
+ priority = Math.Min(priority, 9);
+ }
+
+ return priority;
+ }
+
+ protected LinkedList<MessageDispatch> GetList(MessageDispatch md)
+ {
+ return channels[getPriority(md)];
+ }
+
+ private MessageDispatch RemoveFirst()
+ {
+ if(this.size > 0)
+ {
+ for(int i = MAX_PRIORITY - 1; i >= 0; i--)
+ {
+ LinkedList<MessageDispatch> list = channels[i];
+ if(list.Count != 0)
+ {
+ this.size--;
+ MessageDispatch dispatch = list.First.Value;
+ list.RemoveFirst();
+ return dispatch;
+ }
+ }
+ }
+ return null;
+ }
+
+ private MessageDispatch GetFirst()
+ {
+ if(this.size > 0)
+ {
+ for(int i = MAX_PRIORITY - 1; i >= 0; i--)
+ {
+ LinkedList<MessageDispatch> list = channels[i];
+ if(list.Count != 0)
+ {
+ return list.First.Value;
+ }
+ }
+ }
+ return null;
+ }
+ }
+}
+
Propchange:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Util/SimplePriorityMessageDispatchChannel.cs
------------------------------------------------------------------------------
svn:eol-style = native