Repository: activemq-artemis Updated Branches: refs/heads/master 9a52f51c9 -> f1891f162
Revert "ARTEMIS-1011 Small adjustment on test" Revert "ARTEMIS-1011 adjust slow-consumer detection logic" This reverts commit 9818206bd3aa893864eedf06d19d0c2d5c355a9c. This reverts commit 19ebbfb5f0a10daca6f2f516efae4755613254fd. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e9ad1c81 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e9ad1c81 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e9ad1c81 Branch: refs/heads/master Commit: e9ad1c81a539e4380435d5bd79e0d18d18d707d6 Parents: 9a52f51 Author: Justin Bertram <[email protected]> Authored: Fri Mar 10 08:14:58 2017 -0600 Committer: Clebert Suconic <[email protected]> Committed: Fri Mar 10 09:26:41 2017 -0500 ---------------------------------------------------------------------- .../artemis/core/server/impl/QueueImpl.java | 20 +++++----- docs/user-manual/en/slow-consumers.md | 19 +--------- .../integration/client/SlowConsumerTest.java | 40 -------------------- 3 files changed, 13 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9ad1c81/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index e04cf47..fc655f6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -233,6 +233,8 @@ public class QueueImpl implements Queue { private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis()); + private final AtomicLong messagesAddedSnapshot = new AtomicLong(0); + private ScheduledFuture slowConsumerReaperFuture; private SlowConsumerReaperRunnable slowConsumerReaperRunnable; @@ -2815,11 +2817,13 @@ public class QueueImpl implements Queue { @Override public float getRate() { + long locaMessageAdded = getMessagesAdded(); float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f); if (timeSlice == 0) { + messagesAddedSnapshot.getAndSet(locaMessageAdded); return 0.0f; } - return BigDecimal.valueOf(getMessageCount() / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue(); + return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue(); } // Inner classes @@ -3135,19 +3139,17 @@ public class QueueImpl implements Queue { @Override public void run() { - Set<Consumer> consumersSet = getConsumers(); - - if (consumersSet.size() == 0) { - logger.debug("There are no consumers, no need to check slow consumer's rate"); - return; - } - float queueRate = getRate(); if (logger.isDebugEnabled()) { logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second."); } - if (queueRate < (threshold * consumersSet.size())) { + Set<Consumer> consumersSet = getConsumers(); + + if (consumersSet.size() == 0) { + logger.debug("There are no consumers, no need to check slow consumer's rate"); + return; + } else if (queueRate < (threshold * consumersSet.size())) { if (logger.isDebugEnabled()) { logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer."); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9ad1c81/docs/user-manual/en/slow-consumers.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/slow-consumers.md b/docs/user-manual/en/slow-consumers.md index 5840912..b53bac5 100644 --- a/docs/user-manual/en/slow-consumers.md +++ b/docs/user-manual/en/slow-consumers.md @@ -18,23 +18,8 @@ By default the server will not detect slow consumers. If slow consumer detection is desired then see [queue attributes chapter](queue-attributes.md) for more details. -The calculation to determine whether or not a consumer is slow inspects two notable -metrics: - -1. The queue's message count. - -2. The number of messages a consumer has acknowledged. - -The queue's message count is inspected to ensure that the queue actually has had enough -messages to actually satisfy the consumer's threshold. For example, it would not be -fair to mark a consumer as "slow" if the queue received no messages. This is also notable -because in order to get an accurate message count the queue must be locked which can -negatively impact performance in high-throughput use-cases. Therefore slow-consumer -detection is only recommended on queues where it is absolutely necessary and in those -cases it may be worth tuning the `slow-consumer-check-period` to ensure it's not -running so often as to negatively impact performance. - -Finally, the algorithm inspects the number of messages a particular consumer has +The calculation to determine whether or not a consumer is slow only +inspects the number of messages a particular consumer has *acknowledged*. It doesn't take into account whether or not flow control has been enabled on the consumer, whether or not the consumer is streaming a large message, etc. Keep this in mind when configuring slow http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9ad1c81/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java index 00417ae..c81c24c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java @@ -44,7 +44,6 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; -import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.TimeUtils; @@ -245,45 +244,6 @@ public class SlowConsumerTest extends ActiveMQTestBase { } @Test - public void testSlowConsumerWithBurst() throws Exception { - ClientSessionFactory sf = createSessionFactory(locator); - - ClientSession session = addClientSession(sf.createSession(true, true)); - - ClientProducer producer = addClientProducer(session.createProducer(QUEUE)); - - final int numMessages = 20; - - for (int i = 0; i < numMessages; i++) { - producer.send(createTextMessage(session, "m" + i)); - } - - assertPaging(); - - final Queue queue = server.locateQueue(QUEUE); - - queue.getRate(); - - logger.info("Creating consumer..."); - - ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE)); - session.start(); - - Wait.waitFor(consumer::isClosed, 3000, 100); - - Assert.assertTrue(consumer.isClosed()); - - try { - consumer.receive(500); - fail("Consumer should have been killed since it's slow!"); - } catch (ActiveMQObjectClosedException e) { - // ignore - } catch (Exception e) { - fail("Wrong exception thrown"); - } - } - - @Test public void testFastThenSlowConsumerSpared() throws Exception { locator.setAckBatchSize(0);
