Author: chirino
Date: Fri Sep  1 12:52:18 2006
New Revision: 439442

URL: http://svn.apache.org/viewvc?rev=439442&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-855 allow prefetch==0 to work 
with receive(timeout) and receiveNoWait()

Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 Fri Sep  1 12:52:18 2006
@@ -381,6 +381,8 @@
                     } else {
                         return null;
                     }
+                } else if ( md.getMessage()==null ) {
+                       return null;
                 } else if (md.getMessage().isExpired()) {
                     if (log.isDebugEnabled()) {
                         log.debug("Received expired message: " + md);
@@ -415,9 +417,10 @@
      *         this message consumer is concurrently closed
      */
     public Message receive() throws JMSException {
-        sendPullCommand();
         checkClosed();
         checkMessageListener();
+        
+        sendPullCommand(-1);
         MessageDispatch md = dequeue(-1);
         if (md == null)
             return null;
@@ -454,22 +457,29 @@
      * expires, and the call blocks indefinitely.
      * 
      * @param timeout
-     *            the timeout value (in milliseconds)
+     *            the timeout value (in milliseconds), a time out of zero 
never expires.
      * @return the next message produced for this message consumer, or null if
      *         the timeout expires or this message consumer is concurrently
      *         closed
      */
     public Message receive(long timeout) throws JMSException {
-        sendPullCommand();
         checkClosed();
         checkMessageListener();
         if (timeout == 0) {
             return this.receive();
 
         }
-
+        
+        sendPullCommand(timeout);
         while (timeout > 0) {
-            MessageDispatch md = dequeue(timeout);
+               
+            MessageDispatch md;
+            if (info.getPrefetchSize() == 0) {
+               md = dequeue(-1);  // We let the broker let us know when we 
timeout.
+            } else {
+               md = dequeue(timeout);
+            }
+
             if (md == null)
                 return null;
 
@@ -492,7 +502,15 @@
     public Message receiveNoWait() throws JMSException {
         checkClosed();
         checkMessageListener();
-        MessageDispatch md = dequeue(0);
+        sendPullCommand(-1);
+        
+        MessageDispatch md;
+        if (info.getPrefetchSize() == 0) {
+               md = dequeue(-1);  // We let the broker let us know when we 
timeout.
+        } else {
+               md = dequeue(0);
+        }
+        
         if (md == null)
             return null;
 
@@ -598,10 +616,11 @@
      * we are about to receive
      *
      */
-    protected void sendPullCommand() throws JMSException {
+    protected void sendPullCommand(long timeout) throws JMSException {
         if (info.getPrefetchSize() == 0) {
             MessagePull messagePull = new MessagePull();
             messagePull.configure(info);
+            messagePull.setTimeout(timeout);            
             session.asyncSendPacket(messagePull);
         }
     }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java
 Fri Sep  1 12:52:18 2006
@@ -25,7 +25,7 @@
 import org.apache.activemq.command.MessageId;
 
 /**
- * Only used by the [EMAIL PROTECTED] 
QueueMessageReference#END_OF_BROWSE_MARKER} 
+ * Only used by the [EMAIL PROTECTED] QueueMessageReference#NULL_MESSAGE} 
  */
 final class EndOfBrowseMarkerQueueMessageReference implements
                QueueMessageReference {

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Fri Sep  1 12:52:18 2006
@@ -37,6 +37,7 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.Response;
+import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.util.BrokerSupport;
 import org.apache.commons.logging.Log;
@@ -58,7 +59,7 @@
     long enqueueCounter;
     long dispatchCounter;
     long dequeueCounter;
-    
+        
     public PrefetchSubscription(Broker broker,ConnectionContext 
context,ConsumerInfo info)
                     throws InvalidSelectorException{
         super(broker,context,info);
@@ -68,16 +69,51 @@
     /**
      * Allows a message to be pulled on demand by a client
      */
-    public Response pullMessage(ConnectionContext context, MessagePull pull) 
throws Exception {
-        if (getPrefetchSize() == 0) {
+    synchronized public Response pullMessage(ConnectionContext context, 
MessagePull pull) throws Exception {
+       // The slave should not deliver pull messages.  TODO: when the slave 
becomes a master,
+       // He should send a NULL message to all the consumers to 'wake them up' 
in case 
+       // they were waiting for a message.
+        if (getPrefetchSize() == 0 && !isSlaveBroker()) {
             prefetchExtension++;
-            dispatchMatched();
             
-            // TODO it might be nice one day to actually return the message 
itself
+            final long dispatchCounterBeforePull = dispatchCounter;
+               dispatchMatched();
+               
+               // If there was nothing dispatched.. we may need to setup a 
timeout.
+               if( dispatchCounterBeforePull == dispatchCounter ) {
+                       // imediate timeout used by receiveNoWait()
+                       if( pull.getTimeout() == -1 ) {
+                               // Send a NULL message.
+                       add(QueueMessageReference.NULL_MESSAGE);
+                       dispatchMatched();
+                       }
+                       if( pull.getTimeout() > 0 ) {
+                       Scheduler.executeAfterDelay(new Runnable(){
+                                                       public void run() {
+                                                               
pullTimeout(dispatchCounterBeforePull);
+                                                       }
+                                               }, pull.getTimeout());
+                       }
+               }
         }
         return null;
     }
     
+    /**
+     * Occurs when a pull times out.  If nothing has been dispatched
+     * since the timeout was setup, then send the NULL message.
+     */
+    synchronized private void pullTimeout(long dispatchCounterBeforePull) {    
        
+       if( dispatchCounterBeforePull == dispatchCounter ) {
+               try {
+                               add(QueueMessageReference.NULL_MESSAGE);
+                               dispatchMatched();
+                       } catch (Exception e) {
+                               context.getConnection().serviceException(e);
+                       }
+       }
+       }
+        
     synchronized public void add(MessageReference node) throws Exception{
         enqueueCounter++;
         if(!isFull()){
@@ -311,9 +347,17 @@
         }
         // Make sure we can dispatch a message.
         if(canDispatch(node)&&!isSlaveBroker()){
-            dispatchCounter++;
+               
             MessageDispatch md=createMessageDispatch(node,message);
-            dispatched.addLast(node);            
+
+            // NULL messages don't count... they don't get Acked.
+            if( node != QueueMessageReference.NULL_MESSAGE ) {
+                       dispatchCounter++;
+                       dispatched.addLast(node);            
+            } else {
+               prefetchExtension=Math.max(0,prefetchExtension-1);
+            }
+            
             if(info.isDispatchAsync()){
                 md.setConsumer(new Runnable(){
                     public void run(){
@@ -335,8 +379,10 @@
 
     synchronized protected void onDispatch(final MessageReference node,final 
Message message){
         if(node.getRegionDestination()!=null){
-            
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
-            context.getConnection().getStatistics().onMessageDequeue(message);
+               if( node != QueueMessageReference.NULL_MESSAGE ) {
+                   
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
+                   
context.getConnection().getStatistics().onMessageDequeue(message);
+               }
             try{
                 dispatchMatched();
             }catch(IOException e){
@@ -365,12 +411,20 @@
      * @return
      */
     protected MessageDispatch createMessageDispatch(MessageReference 
node,Message message){
-        MessageDispatch md=new MessageDispatch();
-        md.setConsumerId(info.getConsumerId());
-        
md.setDestination(node.getRegionDestination().getActiveMQDestination());
-        md.setMessage(message);
-        md.setRedeliveryCounter(node.getRedeliveryCounter());
-        return md;
+        if( node == QueueMessageReference.NULL_MESSAGE ) {
+            MessageDispatch md = new MessageDispatch();
+            md.setMessage(null);
+            md.setConsumerId( info.getConsumerId() );
+            md.setDestination( null );
+            return md;
+        } else {
+            MessageDispatch md=new MessageDispatch();
+            md.setConsumerId(info.getConsumerId());
+            
md.setDestination(node.getRegionDestination().getActiveMQDestination());
+            md.setMessage(message);
+            md.setRedeliveryCounter(node.getRedeliveryCounter());
+            return md;
+        }
     }
 
     /**

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java
 Fri Sep  1 12:52:18 2006
@@ -17,16 +17,14 @@
  */
 package org.apache.activemq.broker.region;
 
-import javax.jms.InvalidSelectorException;
-
 import java.io.IOException;
 
+import javax.jms.InvalidSelectorException;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.filter.MessageEvaluationContext;
 
 public class QueueBrowserSubscription extends QueueSubscription {
@@ -53,19 +51,7 @@
 
     public void browseDone() throws Exception {
         browseDone = true;
-        add(QueueMessageReference.END_OF_BROWSE_MARKER);
-    }
-    
-    protected MessageDispatch createMessageDispatch(MessageReference node, 
Message message) {
-        if( node == QueueMessageReference.END_OF_BROWSE_MARKER ) {
-            MessageDispatch md = new MessageDispatch();
-            md.setMessage(null);
-            md.setConsumerId( info.getConsumerId() );
-            md.setDestination( null );
-            return md;
-        } else {
-            return super.createMessageDispatch(node, message);
-        }
+        add(QueueMessageReference.NULL_MESSAGE);
     }
     
     public boolean matches(MessageReference node, MessageEvaluationContext 
context) throws IOException {

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java?rev=439442&r1=439441&r2=439442&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java
 Fri Sep  1 12:52:18 2006
@@ -25,7 +25,7 @@
  */
 public interface QueueMessageReference extends MessageReference {
 
-    public static final QueueMessageReference END_OF_BROWSE_MARKER = new 
EndOfBrowseMarkerQueueMessageReference();
+    public static final QueueMessageReference NULL_MESSAGE = new 
EndOfBrowseMarkerQueueMessageReference();
 
     public boolean isAcked();
     


Reply via email to