Repository: activemq-artemis Updated Branches: refs/heads/1.x 6f0babb36 -> 468b8a954
ARTEMIS-1140 Avoid lock on queue for message counts (cherry picked from commit 33f2ad65c915a8fa2c3606271f106bf5703ace83) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/534fd809 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/534fd809 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/534fd809 Branch: refs/heads/1.x Commit: 534fd8093d4bff4b0f883af1a0fbb3df2d701546 Parents: 6f0babb Author: Clebert Suconic <[email protected]> Authored: Wed May 3 12:10:59 2017 -0400 Committer: Clebert Suconic <[email protected]> Committed: Wed May 3 16:41:31 2017 -0400 ---------------------------------------------------------------------- .../artemis/core/server/impl/QueueImpl.java | 32 +++++++++++--------- 1 file changed, 18 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/534fd809/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index d0d1f10..040a996 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -208,6 +208,8 @@ public class QueueImpl implements Queue { // We cache the consumers here since we don't want to include the redistributor + private final AtomicInteger consumersCount = new AtomicInteger(); + private final Set<Consumer> consumerSet = new HashSet<>(); private final Map<SimpleString, Consumer> groups = new HashMap<>(); @@ -717,7 +719,9 @@ public class QueueImpl implements Queue { consumerList.add(new ConsumerHolder(consumer)); - consumerSet.add(consumer); + if (consumerSet.add(consumer)) { + consumersCount.incrementAndGet(); + } if (refCountForConsumers != null) { refCountForConsumers.increment(); @@ -745,7 +749,9 @@ public class QueueImpl implements Queue { pos = consumerList.size() - 1; } - consumerSet.remove(consumer); + if (consumerSet.remove(consumer)) { + consumersCount.decrementAndGet(); + } LinkedList<SimpleString> groupsToRemove = null; @@ -830,8 +836,8 @@ public class QueueImpl implements Queue { } @Override - public synchronized int getConsumerCount() { - return consumerSet.size(); + public int getConsumerCount() { + return consumersCount.get(); } @Override @@ -917,16 +923,14 @@ public class QueueImpl implements Queue { @Override public long getMessageCount() { - synchronized (this) { - if (pageSubscription != null) { - // messageReferences will have depaged messages which we need to discount from the counter as they are - // counted on the pageSubscription as well - return messageReferences.size() + getScheduledCount() + - deliveringCount.get() + - pageSubscription.getMessageCount(); - } else { - return messageReferences.size() + getScheduledCount() + deliveringCount.get(); - } + if (pageSubscription != null) { + // messageReferences will have depaged messages which we need to discount from the counter as they are + // counted on the pageSubscription as well + return messageReferences.size() + getScheduledCount() + + deliveringCount.get() + + pageSubscription.getMessageCount(); + } else { + return messageReferences.size() + getScheduledCount() + deliveringCount.get(); } }
