Author: chirino
Date: Wed Mar 26 12:56:29 2008
New Revision: 641525
URL: http://svn.apache.org/viewvc?rev=641525&view=rev
Log:
When messages expire take them out of the paged in list so that we can dispatch
more messages to other consumers.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Wed Mar 26 12:56:29 2008
@@ -111,4 +111,6 @@
* @param value
*/
public void setLazyDispatch(boolean value);
+
+ void messageExpired(ConnectionContext context, PrefetchSubscription
prefetchSubscription, MessageReference node);
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
Wed Mar 26 12:56:29 2008
@@ -202,4 +202,8 @@
public void setLazyDispatch(boolean value) {
next.setLazyDispatch(value);
}
+
+ public void messageExpired(ConnectionContext context, PrefetchSubscription
prefetchSubscription, MessageReference node) {
+ next.messageExpired(context, prefetchSubscription, node);
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Wed Mar 26 12:56:29 2008
@@ -246,12 +246,17 @@
// the
// acknowledgment.
int index = 0;
- for (Iterator<MessageReference> iter = dispatched.iterator();
iter
- .hasNext(); index++) {
+ for (Iterator<MessageReference> iter = dispatched.iterator();
iter.hasNext(); index++) {
final MessageReference node = iter.next();
+ if( node.isExpired() ) {
+ broker.messageExpired(getContext(), node);
+ node.getRegionDestination().messageExpired(context,
this, node);
+
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+ dispatched.remove(node);
+ }
if (ack.getLastMessageId().equals(node.getMessageId())) {
- prefetchExtension = Math.max(prefetchExtension,
- index + 1);
+ prefetchExtension = Math.max(prefetchExtension, index
+ 1);
callDispatchMatched = true;
break;
}
@@ -471,12 +476,11 @@
// Message may have been sitting in the pending
// list a while waiting for the consumer to ak
the message.
- if (node != QueueMessageReference.NULL_MESSAGE
- && node.isExpired()) {
+ if (node!=QueueMessageReference.NULL_MESSAGE
&& node.isExpired()) {
broker.messageExpired(getContext(), node);
- dequeueCounter++;
//increment number to dispatch
numberToDispatch++;
+
node.getRegionDestination().messageExpired(context, this, node);
continue;
}
dispatch(node);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Mar 26 12:56:29 2008
@@ -1003,7 +1003,17 @@
}
wakeup();
}
-
+
+ public void messageExpired(ConnectionContext context, PrefetchSubscription
prefetchSubscription, MessageReference reference) {
+ ((QueueMessageReference)reference).drop();
+ // Not sure.. perhaps we should forge an ack to remove the message
from the store.
+ // acknowledge(context, sub, ack, reference);
+ destinationStatistics.getMessages().decrement();
+ synchronized(pagedInMessages) {
+ pagedInMessages.remove(reference.getMessageId());
+ }
+ wakeup();
+ }
protected ConnectionContext createConnectionContext() {
ConnectionContext answer = new ConnectionContext(new
NonCachedMessageEvaluationContext());
@@ -1037,7 +1047,7 @@
dispatchLock.lock();
try{
- int toPageIn = getMaxPageSize() - pagedInMessages.size();
+ int toPageIn =
(getMaxPageSize()+(int)destinationStatistics.getInflight().getCount()) -
pagedInMessages.size();
if (isLazyDispatch()&& !force) {
// Only page in the minimum number of messages which can be
dispatched immediately.
toPageIn = Math.min(getConsumerMessageCountBeforeFull(),
toPageIn);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=641525&r1=641524&r2=641525&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Wed Mar 26 12:56:29 2008
@@ -631,4 +631,9 @@
}
}
+ public void messageExpired(ConnectionContext context, PrefetchSubscription
prefetchSubscription, MessageReference node) {
+ // TODO Auto-generated method stub
+
+ }
+
}