Author: tabish
Date: Wed Jan 21 14:48:51 2009
New Revision: 736458

URL: http://svn.apache.org/viewvc?rev=736458&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQNET-106

Added the methods to send a MessagePull command when prefectch is zero and the 
dispatcher has no messages in its queue.

Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs?rev=736458&r1=736457&r2=736458&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Dispatcher.cs
 Wed Jan 21 14:48:51 2009
@@ -34,6 +34,11 @@
         AutoResetEvent messageReceivedEventHandle = new AutoResetEvent(false);
         bool m_bAsyncDelivery = false;
         bool m_bClosed = false;
+               
+               public bool isEmpty() 
+               {
+                       return this.queue.Count == 0;
+               }
 
                public void SetAsyncDelivery(AutoResetEvent eventHandle)
                {

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=736458&r1=736457&r2=736458&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageConsumer.cs
 Wed Jan 21 14:48:51 2009
@@ -99,18 +99,21 @@
                public IMessage Receive()
                {
                        CheckClosed();
+                       SendPullRequest(0);
                        return SetupAcknowledge(dispatcher.Dequeue());
                }
 
                public IMessage Receive(System.TimeSpan timeout)
                {
                        CheckClosed();
+                       SendPullRequest((long) timeout.TotalMilliseconds);
                        return SetupAcknowledge(dispatcher.Dequeue(timeout));
                }
 
                public IMessage ReceiveNoWait()
                {
                        CheckClosed();
+                       SendPullRequest(-1);
                        return SetupAcknowledge(dispatcher.DequeueNoWait());
                }
 
@@ -258,6 +261,22 @@
 
                        return message;
                }
+               
+               protected void SendPullRequest( long timeout ) 
+               {
+            CheckClosed();
+
+                       if(this.info.PrefetchSize == 0 && 
this.dispatcher.isEmpty())
+                       {
+                               MessagePull messagePull = new MessagePull();
+                               messagePull.ConsumerId = this.info.ConsumerId;
+                messagePull.Destination = this.info.Destination;
+                messagePull.Timeout = timeout;
+
+                               Tracer.Debug("Sending MessagePull: " + 
messagePull);
+                               session.Connection.OneWay(messagePull);
+                       }
+               }
 
                protected void DoNothingAcknowledge(ActiveMQMessage message)
                {


Reply via email to