Repository: activemq Updated Branches: refs/heads/master 021c82859 -> 6e468b454
AMQ-6947 - Update Queue metrics on expiration The updated dropMessage method only decrements the destination metrics if a message is removed from the pagedInMessages list to prevent duplicate updates. There is also a case where we still need to update metrics if the message never makes it into the pagedInMessages list in the first place and that is on expiration so this patch fixes that. A couple existing tests found this issue. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6e468b45 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6e468b45 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6e468b45 Branch: refs/heads/master Commit: 6e468b4540754cad5cd30de373cadc026c998669 Parents: 021c828 Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Wed Apr 18 09:04:11 2018 -0400 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Wed Apr 18 09:04:11 2018 -0400 ---------------------------------------------------------------------- .../java/org/apache/activemq/broker/region/Queue.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/6e468b45/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 64f1da2..4aa8f7b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1872,8 +1872,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pagedInMessagesLock.writeLock().lock(); try { if (pagedInMessages.remove(reference) != null) { - getDestinationStatistics().getDequeues().increment(); - getDestinationStatistics().getMessages().decrement(); + updateMetricsOnMessageDrop(); } } finally { pagedInMessagesLock.writeLock().unlock(); @@ -1881,6 +1880,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } + private void updateMetricsOnMessageDrop() { + getDestinationStatistics().getDequeues().increment(); + getDestinationStatistics().getMessages().decrement(); + } + public void messageExpired(ConnectionContext context, MessageReference reference) { messageExpired(context, null, reference); } @@ -2037,6 +2041,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index if (processExpired && ref.isExpired()) { if (broker.isExpired(ref)) { messageExpired(createConnectionContext(), ref); + + //We need to update the metrics here because the drop message + //method will only update if the message was removed from the + //pagedInMessages list which won't happen in this case + updateMetricsOnMessageDrop(); } else { ref.decrementReferenceCount(); }
