Author: chirino
Date: Wed Mar 26 12:56:29 2008
New Revision: 641525

URL: http://svn.apache.org/viewvc?rev=641525&view=rev
Log:
When messages expire take them out of the paged in list so that we can dispatch 
more messages to other consumers.

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
 Wed Mar 26 12:56:29 2008
@@ -111,4 +111,6 @@
      * @param value
      */
     public void setLazyDispatch(boolean value);
+
+    void messageExpired(ConnectionContext context, PrefetchSubscription 
prefetchSubscription, MessageReference node);
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
 Wed Mar 26 12:56:29 2008
@@ -202,4 +202,8 @@
     public void setLazyDispatch(boolean value) {
       next.setLazyDispatch(value);        
     }
+
+    public void messageExpired(ConnectionContext context, PrefetchSubscription 
prefetchSubscription, MessageReference node) {
+        next.messageExpired(context, prefetchSubscription, node);        
+    }
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Wed Mar 26 12:56:29 2008
@@ -246,12 +246,17 @@
                 // the
                 // acknowledgment.
                 int index = 0;
-                for (Iterator<MessageReference> iter = dispatched.iterator(); 
iter
-                        .hasNext(); index++) {
+                for (Iterator<MessageReference> iter = dispatched.iterator(); 
iter.hasNext(); index++) {
                     final MessageReference node = iter.next();
+                    if( node.isExpired() ) {
+                        broker.messageExpired(getContext(), node);
+                        node.getRegionDestination().messageExpired(context, 
this, node);
+                        
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+                        
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+                        dispatched.remove(node);
+                    }
                     if (ack.getLastMessageId().equals(node.getMessageId())) {
-                        prefetchExtension = Math.max(prefetchExtension,
-                                index + 1);
+                        prefetchExtension = Math.max(prefetchExtension, index 
+ 1);
                         callDispatchMatched = true;
                         break;
                     }
@@ -471,12 +476,11 @@
 
                                 // Message may have been sitting in the pending
                                 // list a while waiting for the consumer to ak 
the message.
-                                if (node != QueueMessageReference.NULL_MESSAGE
-                                        && node.isExpired()) {
+                                if (node!=QueueMessageReference.NULL_MESSAGE 
&& node.isExpired()) {
                                     broker.messageExpired(getContext(), node);
-                                    dequeueCounter++;
                                     //increment number to dispatch
                                     numberToDispatch++;
+                                    
node.getRegionDestination().messageExpired(context, this, node);
                                     continue;
                                 }
                                 dispatch(node);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Wed Mar 26 12:56:29 2008
@@ -1003,7 +1003,17 @@
         }
         wakeup();
     }
-
+    
+    public void messageExpired(ConnectionContext context, PrefetchSubscription 
prefetchSubscription, MessageReference reference) {
+        ((QueueMessageReference)reference).drop();
+        // Not sure.. perhaps we should forge an ack to remove the message 
from the store.
+        // acknowledge(context, sub, ack, reference);
+        destinationStatistics.getMessages().decrement();
+        synchronized(pagedInMessages) {
+            pagedInMessages.remove(reference.getMessageId());
+        }
+        wakeup();
+    }
     
     protected ConnectionContext createConnectionContext() {
         ConnectionContext answer = new ConnectionContext(new 
NonCachedMessageEvaluationContext());
@@ -1037,7 +1047,7 @@
         dispatchLock.lock();
         try{
         
-            int toPageIn = getMaxPageSize() - pagedInMessages.size();
+            int toPageIn = 
(getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) - 
pagedInMessages.size();
             if (isLazyDispatch()&& !force) {
              // Only page in the minimum number of messages which can be 
dispatched immediately.
              toPageIn = Math.min(getConsumerMessageCountBeforeFull(), 
toPageIn);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 Wed Mar 26 12:56:29 2008
@@ -631,4 +631,9 @@
         }
     }
 
+    public void messageExpired(ConnectionContext context, PrefetchSubscription 
prefetchSubscription, MessageReference node) {
+        // TODO Auto-generated method stub
+        
+    }
+
 }


Reply via email to