Repository: activemq-artemis Updated Branches: refs/heads/2.6.x b6ec8245c -> 0b24d0b69
ARTEMIS-1999 Broker uses 100% core's CPU time if msg grouping is used The deliver loop won't give up trying to deliver messages when back-pressure kicks in (credits and/or TCP) if msg grouping is used and there are many consumers registered: this change will allow the loop to exit by instructing the logic that the group consumer is the only consumer to check. (cherry picked from commit 8dd0e9472fc9eaf08c3d64c3935aeafbf04a422a) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0b24d0b6 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0b24d0b6 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0b24d0b6 Branch: refs/heads/2.6.x Commit: 0b24d0b6925db0ad3458f077b087735b6087254a Parents: b6ec824 Author: Francesco Nigro <nigro....@gmail.com> Authored: Tue Jul 31 11:16:26 2018 +0200 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Wed Aug 8 15:45:13 2018 -0400 ---------------------------------------------------------------------- .../artemis/core/server/impl/QueueImpl.java | 9 ++- .../unit/core/server/impl/QueueImplTest.java | 60 ++++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b24d0b6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index ccb8267..c80d4f3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2365,7 +2365,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - if (pos == endPos) { + if (groupConsumer != null || exclusive) { + if (noDelivery > 0) { + break; + } + noDelivery = 0; + } else if (pos == endPos) { // Round robin'd all if (noDelivery == size) { @@ -2917,7 +2922,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return true; } - if (pos == startPos) { + if (pos == startPos || groupConsumer != null || exclusive) { // Tried them all break; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b24d0b6/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index 0aa6e5c..b0987aa 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -1289,6 +1290,65 @@ public class QueueImplTest extends ActiveMQTestBase { } } + @Test + public void testGroupMessageWithManyConsumers() throws Exception { + final CountDownLatch firstMessageHandled = new CountDownLatch(1); + final CountDownLatch finished = new CountDownLatch(2); + final Consumer groupConsumer = new FakeConsumer() { + + int count = 0; + + @Override + public synchronized HandleStatus handle(MessageReference reference) { + if (count == 0) { + //the first message is handled and will be used to determine this consumer + //to be the group consumer + count++; + firstMessageHandled.countDown(); + return HandleStatus.HANDLED; + } else if (count <= 2) { + //the next two attempts to send the second message will be done + //attempting a direct delivery and an async one after that + count++; + finished.countDown(); + return HandleStatus.BUSY; + } else { + //this shouldn't happen, because the last attempt to deliver + //the second message should have stop the delivery loop: + //it will succeed just to let the message being handled and + //reduce the message count to 0 + return HandleStatus.HANDLED; + } + } + }; + final Consumer noConsumer = new FakeConsumer() { + @Override + public synchronized HandleStatus handle(MessageReference reference) { + Assert.fail("this consumer isn't allowed to consume any message"); + throw new AssertionError(); + } + }; + final QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1, + null, null, false, true, false, + scheduledExecutor, null, null, null, + ArtemisExecutor.delegate(executor), null, null); + queue.addConsumer(groupConsumer); + queue.addConsumer(noConsumer); + final MessageReference firstMessageReference = generateReference(queue, 1); + final SimpleString groupName = SimpleString.toSimpleString("group"); + firstMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName); + final MessageReference secondMessageReference = generateReference(queue, 2); + secondMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName); + queue.addTail(firstMessageReference, true); + Assert.assertTrue("first message isn't handled", firstMessageHandled.await(3000, TimeUnit.MILLISECONDS)); + Assert.assertEquals("group consumer isn't correctly set", groupConsumer, queue.getGroups().get(groupName)); + queue.addTail(secondMessageReference, true); + final boolean atLeastTwoDeliverAttempts = finished.await(3000, TimeUnit.MILLISECONDS); + Assert.assertTrue(atLeastTwoDeliverAttempts); + Thread.sleep(1000); + Assert.assertEquals("The second message should be in the queue", 1, queue.getMessageCount()); + } + private QueueImpl getNonDurableQueue() { return getQueue(QueueImplTest.queue1, false, false, null); }