[ 
https://issues.apache.org/activemq/browse/AMQ-2908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=61768#action_61768
 ] 

Siim Kaalep commented on AMQ-2908:
----------------------------------

{quote}
do you have a test case for this?
{quote}
Regrettably, no.

> Slow consumer stops receiving messages because 
> PrefetchSubscription.dispatched is filled with expired messages.
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2908
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2908
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.3.2
>            Reporter: Siim Kaalep
>
> Slow consumer gets stuck when consuming from queue that has expiring messages 
> in it. 
> Looked into broker while it got stuck and saw that 
> PrefetchSubscription.dispatched is full of expired messages.
> WORKAROUND
> Into doActualDispatch added check that if subscription is full, it will 
> remove all expired message from dispatch.
> {code}
> Index: 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> ===================================================================
> --- 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>                (revision 42304)
> +++ 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>             (working copy)
> @@ -400,6 +400,21 @@
>          }
>      }
>  
> +   public void removeExpiredMessagesFromDispatch() {
> +     synchronized(dispatchLock) {
> +                  for (Iterator<MessageReference> iter = 
> dispatched.iterator(); iter.hasNext(); ) {
> +                    final MessageReference node = iter.next();
> +                    if (node.isExpired()) {
> +                        if (broker.isExpired(node)) {
> +                            
> node.getRegionDestination().messageExpired(context, this, node);
> +                        }
> +                        dispatched.remove(node);
> +                        
> node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
> +                    }
> +                  }
> +     }
> +   }
> +    
>      /**
>       * Checks an ack versus the contents of the dispatched list.
>       * 
> {code}
> {code}
> Index: 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
> ===================================================================
> --- 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>            (revision 42304)
> +++ 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>         (working copy)
> @@ -1543,6 +1543,9 @@
>                  }
>                  if (dispatchSelector.canSelect(s, node)) {
>                      if (!fullConsumers.contains(s)) {
> +                               if (s.isFull() && s instanceof 
> PrefetchSubscription) {
> +                                             
> ((PrefetchSubscription)s).removeExpiredMessagesFromDispatch();
> +                               }
>                          if (!s.isFull()) {
>                              // Dispatch it.
>                              s.add(node);
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to