[ 
https://issues.apache.org/activemq/browse/AMQ-2908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=63092#action_63092
 ] 

Siim Kaalep commented on AMQ-2908:
----------------------------------

After performing more tests I also observed the appearance of OutOfMemoryError. 
I would like to raise the theoretical case that OutOfMemoryError caused some 
thread to die that was supposed to perform maintenance for the dispatched list. 
Leading to a situation wherein the expired messages clogged the processing of 
the dispatched list.

If that would be the case, then the current fix is redundant and the real 
problem is caused by OutOfMemoryError.

> Slow consumer stops receiving messages because 
> PrefetchSubscription.dispatched is filled with expired messages.
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2908
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2908
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.3.2
>            Reporter: Siim Kaalep
>            Assignee: Gary Tully
>
> Slow consumer gets stuck when consuming from queue that has expiring messages 
> in it. 
> Looked into broker while it got stuck and saw that 
> PrefetchSubscription.dispatched is full of expired messages.
> WORKAROUND
> Into doActualDispatch added check that if subscription is full, it will 
> remove all expired message from dispatch.
> {code}
> Index: 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> ===================================================================
> --- 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>                (revision 42304)
> +++ 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>             (working copy)
> @@ -400,6 +400,21 @@
>          }
>      }
>  
> +   public void removeExpiredMessagesFromDispatch() {
> +     synchronized(dispatchLock) {
> +                  for (Iterator<MessageReference> iter = 
> dispatched.iterator(); iter.hasNext(); ) {
> +                    final MessageReference node = iter.next();
> +                    if (node.isExpired()) {
> +                        if (broker.isExpired(node)) {
> +                            
> node.getRegionDestination().messageExpired(context, this, node);
> +                        }
> +                        dispatched.remove(node);
> +                        
> node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
> +                    }
> +                  }
> +     }
> +   }
> +    
>      /**
>       * Checks an ack versus the contents of the dispatched list.
>       * 
> {code}
> {code}
> Index: 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
> ===================================================================
> --- 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>            (revision 42304)
> +++ 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>         (working copy)
> @@ -1543,6 +1543,9 @@
>                  }
>                  if (dispatchSelector.canSelect(s, node)) {
>                      if (!fullConsumers.contains(s)) {
> +                               if (s.isFull() && s instanceof 
> PrefetchSubscription) {
> +                                             
> ((PrefetchSubscription)s).removeExpiredMessagesFromDispatch();
> +                               }
>                          if (!s.isFull()) {
>                              // Dispatch it.
>                              s.add(node);
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to