Slow consumer stops receiving messages because PrefetchSubscription.dispatched
is filled with expired messages.
---------------------------------------------------------------------------------------------------------------
Key: AMQ-2908
URL: https://issues.apache.org/activemq/browse/AMQ-2908
Project: ActiveMQ
Issue Type: Bug
Components: Broker
Affects Versions: 5.3.2
Reporter: Siim Kaalep
Slow consumer gets stuck when consuming from queue that has expiring messages
in it.
Looked into broker while it got stuck and saw that
PrefetchSubscription.dispatched is full of expired messages.
WORKAROUND
Into doActualDispatch added check that if subscription is full, it will remove
all expired message from dispatch.
{code}
Index:
trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
===================================================================
---
trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(revision 42304)
+++
trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(working copy)
@@ -400,6 +400,21 @@
}
}
+ public void removeExpiredMessagesFromDispatch() {
+ synchronized(dispatchLock) {
+ for (Iterator<MessageReference> iter =
dispatched.iterator(); iter.hasNext(); ) {
+ final MessageReference node = iter.next();
+ if (node.isExpired()) {
+ if (broker.isExpired(node)) {
+
node.getRegionDestination().messageExpired(context, this, node);
+ }
+ dispatched.remove(node);
+
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+ }
+ }
+ }
+ }
+
/**
* Checks an ack versus the contents of the dispatched list.
*
{code}
{code}
Index:
trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
===================================================================
---
trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(revision 42304)
+++
trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(working copy)
@@ -1543,6 +1543,9 @@
}
if (dispatchSelector.canSelect(s, node)) {
if (!fullConsumers.contains(s)) {
+ if (s.isFull() && s instanceof
PrefetchSubscription) {
+
((PrefetchSubscription)s).removeExpiredMessagesFromDispatch();
+ }
if (!s.isFull()) {
// Dispatch it.
s.add(node);
{code}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.