Author: tabish
Date: Wed Jan 21 14:48:51 2009
New Revision: 736458
URL: http://svn.apache.org/viewvc?rev=736458&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-106
Added the methods to send a MessagePull command when prefectch is zero and the
dispatcher has no messages in its queue.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs?rev=736458&r1=736457&r2=736458&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
Wed Jan 21 14:48:51 2009
@@ -34,6 +34,11 @@
AutoResetEvent messageReceivedEventHandle = new AutoResetEvent(false);
bool m_bAsyncDelivery = false;
bool m_bClosed = false;
+
+ public bool isEmpty()
+ {
+ return this.queue.Count == 0;
+ }
public void SetAsyncDelivery(AutoResetEvent eventHandle)
{
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=736458&r1=736457&r2=736458&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
Wed Jan 21 14:48:51 2009
@@ -99,18 +99,21 @@
public IMessage Receive()
{
CheckClosed();
+ SendPullRequest(0);
return SetupAcknowledge(dispatcher.Dequeue());
}
public IMessage Receive(System.TimeSpan timeout)
{
CheckClosed();
+ SendPullRequest((long) timeout.TotalMilliseconds);
return SetupAcknowledge(dispatcher.Dequeue(timeout));
}
public IMessage ReceiveNoWait()
{
CheckClosed();
+ SendPullRequest(-1);
return SetupAcknowledge(dispatcher.DequeueNoWait());
}
@@ -258,6 +261,22 @@
return message;
}
+
+ protected void SendPullRequest( long timeout )
+ {
+ CheckClosed();
+
+ if(this.info.PrefetchSize == 0 &&
this.dispatcher.isEmpty())
+ {
+ MessagePull messagePull = new MessagePull();
+ messagePull.ConsumerId = this.info.ConsumerId;
+ messagePull.Destination = this.info.Destination;
+ messagePull.Timeout = timeout;
+
+ Tracer.Debug("Sending MessagePull: " +
messagePull);
+ session.Connection.OneWay(messagePull);
+ }
+ }
protected void DoNothingAcknowledge(ActiveMQMessage message)
{