This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
     new 4facedc  AMQ-7228 - Avoid unnecessary lock contention when getting 
pending metrics
4facedc is described below

commit 4facedccf64724ff8a89f7ea050a4dd8ec8406a9
Author: Christopher L. Shannon (cshannon) <[email protected]>
AuthorDate: Fri Jun 14 10:46:21 2019 -0400

    AMQ-7228 - Avoid unnecessary lock contention when getting pending
    metrics
    
    (cherry picked from commit dc56fa3f6ea753b692b4b3a9ffacc4f82de6af74)
---
 .../broker/region/PrefetchSubscription.java        |  4 +--
 .../activemq/broker/region/TopicSubscription.java  | 42 ++++++++--------------
 2 files changed, 15 insertions(+), 31 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index fc68fc1..fe7e732 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -529,9 +529,7 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
 
     @Override
     public long getPendingMessageSize() {
-        synchronized (pendingLock) {
-            return pending.messageSize();
-        }
+        return pending.messageSize();
     }
 
     @Override
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index bf3f97b..0bf1c4e 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -16,14 +16,6 @@
  */
 package org.apache.activemq.broker.region;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-
 import org.apache.activemq.ActiveMQMessageAudit;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
@@ -32,15 +24,7 @@ import 
org.apache.activemq.broker.region.cursors.PendingMessageCursor;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
 import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
 import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.Response;
+import org.apache.activemq.command.*;
 import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.transaction.Synchronization;
 import org.apache.activemq.transport.TransmitCallback;
@@ -48,6 +32,14 @@ import org.apache.activemq.usage.SystemUsage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class TopicSubscription extends AbstractSubscription {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TopicSubscription.class);
@@ -61,7 +53,7 @@ public class TopicSubscription extends AbstractSubscription {
 
     private int maximumPendingMessages = -1;
     private MessageEvictionStrategy messageEvictionStrategy = new 
OldestMessageEvictionStrategy();
-    private int discarded;
+    private final AtomicInteger discarded = new AtomicInteger();
     private final Object matchedListMutex = new Object();
     private int memoryUsageHighWaterMark = 95;
     // allow duplicate suppression in a ring network of brokers
@@ -448,9 +440,7 @@ public class TopicSubscription extends AbstractSubscription 
{
 
     @Override
     public long getPendingMessageSize() {
-        synchronized (matchedListMutex) {
-            return matched.messageSize();
-        }
+        return matched.messageSize();
     }
 
     @Override
@@ -482,9 +472,7 @@ public class TopicSubscription extends AbstractSubscription 
{
      * @return the number of messages discarded due to being a slow consumer
      */
     public int discarded() {
-        synchronized (matchedListMutex) {
-            return discarded;
-        }
+        return discarded.get();
     }
 
     /**
@@ -493,9 +481,7 @@ public class TopicSubscription extends AbstractSubscription 
{
      *         prefetch buffer being full).
      */
     public int matched() {
-        synchronized (matchedListMutex) {
-            return matched.size();
-        }
+        return matched.size();
     }
 
     /**
@@ -727,7 +713,7 @@ public class TopicSubscription extends AbstractSubscription 
{
         try {
             message.decrementReferenceCount();
             matched.remove(message);
-            discarded++;
+            discarded.incrementAndGet();
             if (destination != null) {
                 
destination.getDestinationStatistics().getDequeues().increment();
             }

Reply via email to