[ 
https://issues.apache.org/activemq/browse/AMQ-2908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=62611#action_62611
 ] 

Paul Morris commented on AMQ-2908:
----------------------------------

hmmm i am wondering if i am seeing the same issue here, unfortunately the 
original stack trace or indication of why the user thinks this is a bug was not 
included but i am seeing the same sort of thing and the broker produced the 
following stack trace:

2010-09-05 09:07:51,844 | ERROR | Failed to page in more queue messages  | 
org.apache.activemq.broker.region.Queue | Queue:CORE-IN-QUEUE
java.lang.OutOfMemoryError: Java heap space
                at java.util.Arrays.copyOf(Unknown Source)
                at java.util.concurrent.CopyOnWriteArrayList.add(Unknown Source)
                at 
org.apache.activemq.broker.region.PrefetchSubscription.dispatch(PrefetchSubscription.java:630)
                at 
org.apache.activemq.broker.region.PrefetchSubscription.dispatchPending(PrefetchSubscription.java:592)
                at 
org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:158)
                at 
org.apache.activemq.broker.region.Queue.doActualDispatch(Queue.java:1548)
                at 
org.apache.activemq.broker.region.Queue.doDispatch(Queue.java:1500)
                at 
org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:1585)
                at 
org.apache.activemq.broker.region.Queue.iterate(Queue.java:1219)
                at 
org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98)
                at 
org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36)

we are using 5.3.1 by the way, and i will hopefully upgrade to 5.4.1 next week 
and retry

in our case we have very slow consumers in certain conditions, because they 
recieve the message and post it out to a web endpoint, and at various points 
during the year the web endpoint is taken offline, causing us the above 
problem.... 

we do have a test case here, but it is kind of explicitly linked into stopping 
and starting our services, and pulling the IP stack down, so it is kind of 
difficult to repost that one, and it takes about 2 days for it to actually kill 
the system after 300 messages per second

> Slow consumer stops receiving messages because 
> PrefetchSubscription.dispatched is filled with expired messages.
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2908
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2908
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.3.2
>            Reporter: Siim Kaalep
>            Assignee: Gary Tully
>
> Slow consumer gets stuck when consuming from queue that has expiring messages 
> in it. 
> Looked into broker while it got stuck and saw that 
> PrefetchSubscription.dispatched is full of expired messages.
> WORKAROUND
> Into doActualDispatch added check that if subscription is full, it will 
> remove all expired message from dispatch.
> {code}
> Index: 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> ===================================================================
> --- 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>                (revision 42304)
> +++ 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>             (working copy)
> @@ -400,6 +400,21 @@
>          }
>      }
>  
> +   public void removeExpiredMessagesFromDispatch() {
> +     synchronized(dispatchLock) {
> +                  for (Iterator<MessageReference> iter = 
> dispatched.iterator(); iter.hasNext(); ) {
> +                    final MessageReference node = iter.next();
> +                    if (node.isExpired()) {
> +                        if (broker.isExpired(node)) {
> +                            
> node.getRegionDestination().messageExpired(context, this, node);
> +                        }
> +                        dispatched.remove(node);
> +                        
> node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
> +                    }
> +                  }
> +     }
> +   }
> +    
>      /**
>       * Checks an ack versus the contents of the dispatched list.
>       * 
> {code}
> {code}
> Index: 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
> ===================================================================
> --- 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>            (revision 42304)
> +++ 
> trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
>         (working copy)
> @@ -1543,6 +1543,9 @@
>                  }
>                  if (dispatchSelector.canSelect(s, node)) {
>                      if (!fullConsumers.contains(s)) {
> +                               if (s.isFull() && s instanceof 
> PrefetchSubscription) {
> +                                             
> ((PrefetchSubscription)s).removeExpiredMessagesFromDispatch();
> +                               }
>                          if (!s.isFull()) {
>                              // Dispatch it.
>                              s.add(node);
> {code}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to