Hi David,

I think this is a valid patch. What I'm looking at at the moment is only adding messages to a QueueSubscriber's pending list which it can dispatch - rather than lots checking to see if they are able to dispatch it.

cheers,

Rob
On Feb 13, 2008, at 6:22 AM, David Sitsky wrote:

Hi Rob,

I changed the condition for when to check the "trash" list to:

              if (count > 0 || trash.size() > 1000)

this gave much better performance for a range of application data. I've re-attached the patch again to avoid confusion. As I said before - the broker consumes far less CPU now than before, so I am able to add a lot more consumers now.

Any thoughts?  Are there better ways of implementing this?

Cheers,
David

David Sitsky wrote:
Hi Rob,
I was using a version that did have you most recent changes.
To give you a better idea of what I meant, I hacked up some changes which you can see from the attached patch. The idea is instead of going through the pending list performing the same computations over and over again on messages which have been already been handled by other subscriptions, to move them to another list. For a particular run, this reduced my application run-time from 47 minutes to 38 minutes. I'm sure there are better ways of implementing this - but do you see what I mean?
Cheers,
David
Rob Davies wrote:
David,

which release are you working on ? There was a change last night in Queue's that might affect the cpu usage.
On Feb 8, 2008, at 5:11 PM, David Sitsky wrote:

In my application, I have noticed with 20 consumers, the broker's CPU is going through the roof, with many threads in PrefetchSubscription.dispatchPending(). With my consumers, it might be 500-1000 messages dispatched before a commit() can be called. With 20 consumers, this means there can be a build-up of 20,000 uncommited messages lying around the system, let-alone the new messages which are being pumped into the system at a furious rate. Not nice I know, but I don't have much choice about it at the moment, for application-specific reasons.

As you can imagine, I can have some very big pending queue sizes - sometimes 100,000 in size.

I am experimenting with different prefetch sizes which may help, but I suspect every time a prefetch thread is trying to dispatch a message, it might have to iterate through very large numbers of deleted messages or messages which have been claimed by other subscribers before it finds a matching message. Multiply this by 20, and there is a lot of CPU being consumed. This worries me for scalability reasons - if I want to keep bumping up the number of consumers.

I'm not sure what the best way of improving this is... is it possible when we call dispatchPending() to not call pendingMessageCursor.reset() perhaps?
reset() is a nop for the QueueStoreCursor :(


I'm trying to understand why we need to reset the cursor, when presumably all off the messages we have gone over before in a previous dispatchPending() call are either deleted, dispatched or locked by another node, and therefore don't need to be checked again (or we check if we reach the end of the cursor list)?
I


I realise if a transaction is rolled back, that a message that was previously locked by another consumer may be freed. There are probably message ordering isues too.

Is it possible when we are iterating through the cursor if we find a node locked by another consumer to perhaps move it to the end of the cursor (or another list) and check it only if we found no matches?

I'm sure there are a lot of complexities here I am not aware of - but I am curious what others think.

Doing this sort of chance should reduce the latencies and CPU usage of the broker significantly.

Cheers,
David




--
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699 Web: http://www.nuix.com Fax: +61 2 9212 6902 Index: activemq-core/src/main/java/org/apache/activemq/broker/region/ PrefetchSubscription.java
===================================================================
--- activemq-core/src/main/java/org/apache/activemq/broker/region/ PrefetchSubscription.java (revision 619666) +++ activemq-core/src/main/java/org/apache/activemq/broker/region/ PrefetchSubscription.java (working copy)
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

@@ -54,6 +55,7 @@

private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
    protected PendingMessageCursor pending;
+ protected List<MessageReference> trash = new LinkedList<MessageReference>(); protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
    protected int prefetchExtension;
    protected long enqueueCounter;
@@ -439,22 +449,41 @@

    protected void dispatchPending() throws IOException {
        if (!isSlave()) {
+           int count = 0;
           synchronized(pendingLock) {
                try {
                    int numberToDispatch = countBeforeFull();
                    if (numberToDispatch > 0) {
                        pending.setMaxBatchSize(numberToDispatch);
-                        int count = 0;
                        pending.reset();
                        while (pending.hasNext() && !isFull()
                                && count < numberToDispatch) {
                            MessageReference node = pending.next();
+                            LockOwner lockOwner;
                            if (node == null) {
                                break;
                            }
                            if(isDropped(node)) {
                                pending.remove();
                            }
+ else if (node instanceof QueueMessageReference && + (((QueueMessageReference)node).isAcked()))
+                            {
+ // Message has been acked. Move it to the trash, since it + // is unlikely to be dispatched to this subscription.
+                                pending.remove();
+                                trash.add(node);
+                            }
+ else if (node instanceof IndirectMessageReference && + (lockOwner = ((IndirectMessageReference)node).getLockOwner()) != null &&
+                                     lockOwner != this)
+                            {
+ // Message which has been locked by another subscription. + // Move it to the trash, since it is unlikely to be
+                                // dispatched to this subscription.
+                                pending.remove();
+                                trash.add(node);
+                            }
                            else if (canDispatch(node)) {
                                pending.remove();
// Message may have been sitting in the pending
@@ -475,7 +504,40 @@
                } finally {
                    pending.release();
                }
-            }
+
+ // Check if any trash can be cleaned up or some messages need to be placed
+               // back into the pending list.
+               if (count > 0 || trash.size() > 1000)
+               {
+ for (Iterator<MessageReference> iter = trash.iterator(); iter.hasNext();)
+                   {
+                       MessageReference node = iter.next();
+                       if (isDropped(node))
+                       {
+ // Message has been deleted, so it can be removed.
+                           iter.remove();
+                       }
+ else if ((node instanceof QueueMessageReference && + !((QueueMessageReference) node).isAcked()) || + (node instanceof IndirectMessageReference && + ((IndirectMessageReference) node).getLockOwner() == null))
+                       {
+ // Message is no longer acked or it is not locked by anyone + // probably due to a rolledback transaction. Re-inject it into + // the pending list again. This shouldn't be very common.
+                           try
+                           {
+                               pending.addMessageLast(node);
+                               iter.remove();
+                           }
+                           catch (Exception e)
+                           {
+ throw new IOException("Unable to add message to pending list", e);
+                           }
+                       }
+                   }
+               }
+           }
        }
    }


Reply via email to