Repository: activemq Updated Branches: refs/heads/activemq-5.15.x d1746e4ad -> 4a99103e7
AMQ-7035 - use NonCachedMessageEvaluationContext in place of MessageEvaluationContext to avoid unnecessary reference count management and subsequent leaks. Rework AMQ-6465 with additional JMX related tests (cherry picked from commit 50d27e7e545d30bc0d35f8dd8baf15b33522c33a) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4a99103e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4a99103e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4a99103e Branch: refs/heads/activemq-5.15.x Commit: 4a99103e706653f395e44d4b43a545b80ef664d9 Parents: d1746e4 Author: gtully <[email protected]> Authored: Wed Aug 15 16:21:57 2018 +0100 Committer: Timothy Bish <[email protected]> Committed: Wed Aug 15 11:24:36 2018 -0400 ---------------------------------------------------------------------- .../activemq/broker/ConnectionContext.java | 3 +- .../activemq/broker/jmx/DestinationView.java | 8 +-- .../activemq/broker/region/BaseDestination.java | 2 +- .../cursors/FilePendingMessageCursor.java | 2 +- .../network/DemandForwardingBridgeSupport.java | 7 +-- .../apache/activemq/broker/jmx/MBeanTest.java | 56 ++++++++++++++++---- .../apache/activemq/selector/SelectorTest.java | 2 + .../selector/UnknownHandlingSelectorTest.java | 5 +- .../usecases/LargeQueueSparseDeleteTest.java | 9 ++-- 9 files changed, 63 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4a99103e/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java index 8c4db9a..3eba5d8 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java @@ -27,6 +27,7 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.security.MessageAuthorizationPolicy; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ConnectionState; @@ -64,7 +65,7 @@ public class ConnectionContext { private XATransactionId xid; public ConnectionContext() { - this.messageEvaluationContext = new MessageEvaluationContext(); + this.messageEvaluationContext = new NonCachedMessageEvaluationContext(); } public ConnectionContext(MessageEvaluationContext messageEvaluationContext) { http://git-wip-us.apache.org/repos/asf/activemq/blob/4a99103e/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 769d796..3055b57 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -51,7 +51,7 @@ import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.Message; import org.apache.activemq.filter.BooleanExpression; -import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.selector.SelectorParser; import org.apache.activemq.store.MessageStore; import org.apache.activemq.util.URISupport; @@ -213,7 +213,7 @@ public class DestinationView implements DestinationViewMBean { Message[] messages = destination.browse(); ArrayList<CompositeData> c = new ArrayList<CompositeData>(); - MessageEvaluationContext ctx = new MessageEvaluationContext(); + NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext(); ctx.setDestination(destination.getActiveMQDestination()); BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); @@ -256,7 +256,7 @@ public class DestinationView implements DestinationViewMBean { Message[] messages = destination.browse(); ArrayList<Object> answer = new ArrayList<Object>(); - MessageEvaluationContext ctx = new MessageEvaluationContext(); + NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext(); ctx.setDestination(destination.getActiveMQDestination()); BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); @@ -297,7 +297,7 @@ public class DestinationView implements DestinationViewMBean { TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" }); TabularDataSupport rc = new TabularDataSupport(tt); - MessageEvaluationContext ctx = new MessageEvaluationContext(); + NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext(); ctx.setDestination(destination.getActiveMQDestination()); BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); http://git-wip-us.apache.org/repos/asf/activemq/blob/4a99103e/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index aa2f7b5..10841dd 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -841,7 +841,7 @@ public abstract class BaseDestination implements Destination { } public ConnectionContext createConnectionContext() { - ConnectionContext answer = new ConnectionContext(new NonCachedMessageEvaluationContext()); + ConnectionContext answer = new ConnectionContext(); answer.setBroker(this.broker); answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); answer.setSecurityContext(SecurityContext.BROKER_SECURITY_CONTEXT); http://git-wip-us.apache.org/repos/asf/activemq/blob/4a99103e/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index 20b2bc5..f23d817 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -484,7 +484,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple private void discardExpiredMessage(MessageReference reference) { LOG.debug("Discarding expired message {}", reference); if (reference.isExpired() && broker.isExpired(reference)) { - ConnectionContext context = new ConnectionContext(new NonCachedMessageEvaluationContext()); + ConnectionContext context = new ConnectionContext(); context.setBroker(broker); ((Destination)reference.getRegionDestination()).messageExpired(context, null, new IndirectMessageReference(reference.getMessage())); } http://git-wip-us.apache.org/repos/asf/activemq/blob/4a99103e/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 394cccd..7ce0339 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -86,7 +86,7 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.filter.DestinationFilter; -import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.FutureResponse; @@ -1303,13 +1303,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br // for durable subs, suppression via filter leaves dangling acks so we // need to check here and allow the ack irrespective if (sub.getLocalInfo().isDurable()) { - MessageEvaluationContext messageEvalContext = new MessageEvaluationContext(); + NonCachedMessageEvaluationContext messageEvalContext = new NonCachedMessageEvaluationContext(); messageEvalContext.setMessageReference(md.getMessage()); messageEvalContext.setDestination(md.getDestination()); suppress = !sub.getNetworkBridgeFilter().matches(messageEvalContext); - //AMQ-6465 - Need to decrement the reference count after checking matches() as - //the call above will increment the reference count by 1 - messageEvalContext.getMessageReference().decrementReferenceCount(); } return suppress; } http://git-wip-us.apache.org/repos/asf/activemq/blob/4a99103e/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index 0ccf1cb..8ecfee3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -452,15 +452,19 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination); - queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + QueueViewMBean queueNew = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); int movedSize = MESSAGE_COUNT-3; - assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize()); + assertEquals("Unexpected number of messages ",movedSize,queueNew.getQueueSize()); // now lets remove them by selector - queue.removeMatchingMessages("counter > 2"); + queueNew.removeMatchingMessages("counter > 2"); - assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); - assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); + assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queueNew.getQueueSize()); + assertEquals("dest has no memory usage", 0, queueNew.getMemoryPercentUsage()); + assertEquals("dest has 0 memory usage", 0, queueNew.getMemoryUsageByteCount()); + + queue.purge(); + assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount()); } public void testCopyMessagesBySelector() throws Exception { @@ -478,17 +482,47 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + newDestination); - queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + QueueViewMBean queueTwo = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)"); - assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize()); + LOG.info("Queue: " + queueViewMBeanName + " now has: " + queueTwo.getQueueSize() + " message(s)"); + assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queueTwo.getQueueSize()); // now lets remove them by selector - queue.removeMatchingMessages("counter > 2"); + queueTwo.removeMatchingMessages("counter > 2"); - assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); - assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); + assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queueTwo.getQueueSize()); + assertEquals("dest has no memory usage", 0, queueTwo.getMemoryPercentUsage()); + assertEquals("dest has 0 memory usage", 0, queueTwo.getMemoryUsageByteCount()); + + queue.purge(); + assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount()); } + public void testSelectorBrowseUsage() throws Exception { + connection = connectionFactory.createConnection(); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + final String someSelectorExp = "JMSType = '22'"; + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + queue.browse(someSelectorExp); + queue.purge(); + assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount()); + + connection.close(); + connection = connectionFactory.createConnection(); + useConnection(connection); + queue.browseMessages(someSelectorExp); + queue.purge(); + assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount()); + + connection.close(); + connection = connectionFactory.createConnection(); + useConnection(connection); + queue.browseAsTable(someSelectorExp); + queue.purge(); + assertEquals("dest has 0 memory usage", 0, queue.getMemoryUsageByteCount()); + } public void testCopyPurgeCopyBack() throws Exception { connection = connectionFactory.createConnection(); http://git-wip-us.apache.org/repos/asf/activemq/blob/4a99103e/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java index c65674d..66a7af4 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/SelectorTest.java @@ -405,7 +405,9 @@ public class SelectorTest extends TestCase { MessageEvaluationContext context = new MessageEvaluationContext(); context.setMessageReference((org.apache.activemq.command.Message)message); boolean value = selector.matches(context); + context.clear(); assertEquals("Selector for: " + text, expected, value); + assertEquals("ref 0", 0, ((ActiveMQMessage)message).getReferenceCount()); } protected Message createMessage(String subject) throws JMSException { http://git-wip-us.apache.org/repos/asf/activemq/blob/4a99103e/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java index 5c9a8ee..f580b2e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/selector/UnknownHandlingSelectorTest.java @@ -25,7 +25,7 @@ import javax.jms.Message; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.filter.BooleanExpression; -import org.apache.activemq.filter.MessageEvaluationContext; +import org.apache.activemq.filter.NonCachedMessageEvaluationContext; import org.junit.Before; import org.junit.Test; @@ -170,10 +170,11 @@ public class UnknownHandlingSelectorTest { protected void assertSelector(String text, boolean matches) throws JMSException { BooleanExpression selector = SelectorParser.parse(text); assertTrue("Created a valid selector", selector != null); - MessageEvaluationContext context = new MessageEvaluationContext(); + NonCachedMessageEvaluationContext context = new NonCachedMessageEvaluationContext(); context.setMessageReference((org.apache.activemq.command.Message)message); boolean value = selector.matches(context); assertEquals("Selector for: " + text, matches, value); + assertEquals("ref 0", 0, ((ActiveMQMessage)message).getReferenceCount()); } private static String not(String selector) { http://git-wip-us.apache.org/repos/asf/activemq/blob/4a99103e/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java index f69f1b7..dbd3f61 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/LargeQueueSparseDeleteTest.java @@ -84,8 +84,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport { Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get( destination); - ConnectionContext context = new ConnectionContext( - new NonCachedMessageEvaluationContext()); + ConnectionContext context = new ConnectionContext(); context.setBroker(broker.getBroker()); context.getMessageEvaluationContext().setDestination(destination); @@ -133,8 +132,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport { Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get( destination); - ConnectionContext context = new ConnectionContext( - new NonCachedMessageEvaluationContext()); + ConnectionContext context = new ConnectionContext(); context.setBroker(broker.getBroker()); context.getMessageEvaluationContext().setDestination(destination); @@ -179,8 +177,7 @@ public class LargeQueueSparseDeleteTest extends EmbeddedBrokerTestSupport { Queue queue = (Queue) broker.getRegionBroker().getDestinationMap().get( destination); - ConnectionContext context = new ConnectionContext( - new NonCachedMessageEvaluationContext()); + ConnectionContext context = new ConnectionContext(); context.setBroker(broker.getBroker()); context.getMessageEvaluationContext().setDestination(destination);
