Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x b6ec8245c -> 0b24d0b69


ARTEMIS-1999 Broker uses 100% core's CPU time if msg grouping is used

The deliver loop won't give up trying to deliver messages when
back-pressure kicks in (credits and/or TCP) if msg grouping is used and
there are many consumers registered: this change will allow the loop
to exit by instructing the logic that the group consumer is the only
consumer to check.

(cherry picked from commit 8dd0e9472fc9eaf08c3d64c3935aeafbf04a422a)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0b24d0b6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0b24d0b6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0b24d0b6

Branch: refs/heads/2.6.x
Commit: 0b24d0b6925db0ad3458f077b087735b6087254a
Parents: b6ec824
Author: Francesco Nigro <nigro....@gmail.com>
Authored: Tue Jul 31 11:16:26 2018 +0200
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Aug 8 15:45:13 2018 -0400

----------------------------------------------------------------------
 .../artemis/core/server/impl/QueueImpl.java     |  9 ++-
 .../unit/core/server/impl/QueueImplTest.java    | 60 ++++++++++++++++++++
 2 files changed, 67 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b24d0b6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index ccb8267..c80d4f3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2365,7 +2365,12 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                }
             }
 
-            if (pos == endPos) {
+            if (groupConsumer != null || exclusive) {
+               if (noDelivery > 0) {
+                  break;
+               }
+               noDelivery = 0;
+            } else if (pos == endPos) {
                // Round robin'd all
 
                if (noDelivery == size) {
@@ -2917,7 +2922,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                return true;
             }
 
-            if (pos == startPos) {
+            if (pos == startPos || groupConsumer != null || exclusive) {
                // Tried them all
                break;
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b24d0b6/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 0aa6e5c..b0987aa 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -1289,6 +1290,65 @@ public class QueueImplTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testGroupMessageWithManyConsumers() throws Exception {
+      final CountDownLatch firstMessageHandled = new CountDownLatch(1);
+      final CountDownLatch finished = new CountDownLatch(2);
+      final Consumer groupConsumer = new FakeConsumer() {
+
+         int count = 0;
+
+         @Override
+         public synchronized HandleStatus handle(MessageReference reference) {
+            if (count == 0) {
+               //the first message is handled and will be used to determine 
this consumer
+               //to be the group consumer
+               count++;
+               firstMessageHandled.countDown();
+               return HandleStatus.HANDLED;
+            } else if (count <= 2) {
+               //the next two attempts to send the second message will be done
+               //attempting a direct delivery and an async one after that
+               count++;
+               finished.countDown();
+               return HandleStatus.BUSY;
+            } else {
+               //this shouldn't happen, because the last attempt to deliver
+               //the second message should have stop the delivery loop:
+               //it will succeed just to let the message being handled and
+               //reduce the message count to 0
+               return HandleStatus.HANDLED;
+            }
+         }
+      };
+      final Consumer noConsumer = new FakeConsumer() {
+         @Override
+         public synchronized HandleStatus handle(MessageReference reference) {
+            Assert.fail("this consumer isn't allowed to consume any message");
+            throw new AssertionError();
+         }
+      };
+      final QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), 
QueueImplTest.queue1,
+                                            null, null, false, true, false,
+                                            scheduledExecutor, null, null, 
null,
+                                            
ArtemisExecutor.delegate(executor), null, null);
+      queue.addConsumer(groupConsumer);
+      queue.addConsumer(noConsumer);
+      final MessageReference firstMessageReference = generateReference(queue, 
1);
+      final SimpleString groupName = SimpleString.toSimpleString("group");
+      
firstMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, 
groupName);
+      final MessageReference secondMessageReference = generateReference(queue, 
2);
+      
secondMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, 
groupName);
+      queue.addTail(firstMessageReference, true);
+      Assert.assertTrue("first message isn't handled", 
firstMessageHandled.await(3000, TimeUnit.MILLISECONDS));
+      Assert.assertEquals("group consumer isn't correctly set", groupConsumer, 
queue.getGroups().get(groupName));
+      queue.addTail(secondMessageReference, true);
+      final boolean atLeastTwoDeliverAttempts = finished.await(3000, 
TimeUnit.MILLISECONDS);
+      Assert.assertTrue(atLeastTwoDeliverAttempts);
+      Thread.sleep(1000);
+      Assert.assertEquals("The second message should be in the queue", 1, 
queue.getMessageCount());
+   }
+
    private QueueImpl getNonDurableQueue() {
       return getQueue(QueueImplTest.queue1, false, false, null);
    }

Reply via email to