This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
new 4facedc AMQ-7228 - Avoid unnecessary lock contention when getting
pending metrics
4facedc is described below
commit 4facedccf64724ff8a89f7ea050a4dd8ec8406a9
Author: Christopher L. Shannon (cshannon) <[email protected]>
AuthorDate: Fri Jun 14 10:46:21 2019 -0400
AMQ-7228 - Avoid unnecessary lock contention when getting pending
metrics
(cherry picked from commit dc56fa3f6ea753b692b4b3a9ffacc4f82de6af74)
---
.../broker/region/PrefetchSubscription.java | 4 +--
.../activemq/broker/region/TopicSubscription.java | 42 ++++++++--------------
2 files changed, 15 insertions(+), 31 deletions(-)
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index fc68fc1..fe7e732 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -529,9 +529,7 @@ public abstract class PrefetchSubscription extends
AbstractSubscription {
@Override
public long getPendingMessageSize() {
- synchronized (pendingLock) {
- return pending.messageSize();
- }
+ return pending.messageSize();
}
@Override
diff --git
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index bf3f97b..0bf1c4e 100644
---
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -16,14 +16,6 @@
*/
package org.apache.activemq.broker.region;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-
import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
@@ -32,15 +24,7 @@ import
org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
-import org.apache.activemq.command.ConsumerControl;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageDispatchNotification;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.Response;
+import org.apache.activemq.command.*;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.transport.TransmitCallback;
@@ -48,6 +32,14 @@ import org.apache.activemq.usage.SystemUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.jms.JMSException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
public class TopicSubscription extends AbstractSubscription {
private static final Logger LOG =
LoggerFactory.getLogger(TopicSubscription.class);
@@ -61,7 +53,7 @@ public class TopicSubscription extends AbstractSubscription {
private int maximumPendingMessages = -1;
private MessageEvictionStrategy messageEvictionStrategy = new
OldestMessageEvictionStrategy();
- private int discarded;
+ private final AtomicInteger discarded = new AtomicInteger();
private final Object matchedListMutex = new Object();
private int memoryUsageHighWaterMark = 95;
// allow duplicate suppression in a ring network of brokers
@@ -448,9 +440,7 @@ public class TopicSubscription extends AbstractSubscription
{
@Override
public long getPendingMessageSize() {
- synchronized (matchedListMutex) {
- return matched.messageSize();
- }
+ return matched.messageSize();
}
@Override
@@ -482,9 +472,7 @@ public class TopicSubscription extends AbstractSubscription
{
* @return the number of messages discarded due to being a slow consumer
*/
public int discarded() {
- synchronized (matchedListMutex) {
- return discarded;
- }
+ return discarded.get();
}
/**
@@ -493,9 +481,7 @@ public class TopicSubscription extends AbstractSubscription
{
* prefetch buffer being full).
*/
public int matched() {
- synchronized (matchedListMutex) {
- return matched.size();
- }
+ return matched.size();
}
/**
@@ -727,7 +713,7 @@ public class TopicSubscription extends AbstractSubscription
{
try {
message.decrementReferenceCount();
matched.remove(message);
- discarded++;
+ discarded.incrementAndGet();
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
}