modify fix for https://issues.apache.org/jira/browse/AMQ-4181 - apply maxBrowsePageSize, when > 0 it will limit a browser dispatch; related to https://issues.apache.org/jira/browse/AMQ-4487 https://issues.apache.org/jira/browse/AMQ-4372 https://issues.apache.org/jira/browse/AMQ-4595
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/52dc82b0 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/52dc82b0 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/52dc82b0 Branch: refs/heads/activemq-5.9 Commit: 52dc82b000aa7838aab5a9dae2952a9315e6dead Parents: be78a36 Author: gtully <[email protected]> Authored: Tue Oct 22 15:22:36 2013 +0100 Committer: Hadrian Zbarcea <[email protected]> Committed: Tue Mar 11 17:06:12 2014 -0400 ---------------------------------------------------------------------- .../activemq/broker/region/BaseDestination.java | 2 +- .../apache/activemq/broker/region/Queue.java | 4 +- .../broker/region/QueueBrowserSubscription.java | 9 ++ .../broker/region/policy/PolicyEntry.java | 3 + .../apache/activemq/JmsQueueBrowserTest.java | 1 + .../org/apache/activemq/bugs/AMQ4595Test.java | 15 +-- .../usecases/QueueBrowsingLimitTest.java | 113 +++++++++++++++++++ 7 files changed, 137 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 5edbdf3..f350098 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -286,7 +286,7 @@ public abstract class BaseDestination implements Destination { } public int getMaxBrowsePageSize() { - return this.maxBrowsePageSize; + return this.maxBrowsePageSize > 0 ? this.maxBrowsePageSize : getMaxPageSize(); } public void setMaxBrowsePageSize(int maxPageSize) { http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 7713d71..5b50f1c 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1605,7 +1605,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size()); boolean added = false; for (QueueMessageReference node : alreadyDispatchedMessages) { - if (!node.isAcked() && !browser.isDuplicate(node.getMessageId())) { + if (!node.isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) { msgContext.setMessageReference(node); if (browser.matches(node, msgContext)) { browser.add(node); @@ -1614,7 +1614,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } } // are we done browsing? no new messages paged - if (!added) { + if (!added || browser.atMax()) { browser.decrementQueueRef(); browserDispatches.remove(browserDispatch); } http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java index 9bc3c1d..97de921 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java @@ -43,6 +43,7 @@ public class QueueBrowserSubscription extends QueueSubscription { boolean destinationsAdded; private final Map<MessageId, Object> audit = new HashMap<MessageId, Object>(); + private long maxMessages; public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException { super(broker, usageManager, context, info); @@ -115,4 +116,12 @@ public class QueueBrowserSubscription extends QueueSubscription { // in case of browser return new ArrayList<MessageReference>(); } + + public boolean atMax() { + return maxMessages > 0 && getEnqueueCounter() >= maxMessages; + } + + public void setMaxMessages(long max) { + maxMessages = max; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index c219b19..9e1b006 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -275,6 +275,9 @@ public class PolicyEntry extends DestinationMapEntry { // we can remove this and perform a more efficient dispatch. sub.setMaxProducersToAudit(Integer.MAX_VALUE); sub.setMaxAuditDepth(Short.MAX_VALUE); + + // part solution - dispatching to browsers needs to be restricted + sub.setMaxMessages(getMaxBrowsePageSize()); } public void configure(Broker broker, SystemUsage memoryManager, QueueSubscription sub) { http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java index b98a461..c063e24 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueBrowserTest.java @@ -440,6 +440,7 @@ public class JmsQueueBrowserTest extends JmsTestSupport { PolicyMap policyMap = new PolicyMap(); PolicyEntry policyEntry = new PolicyEntry(); policyEntry.setUseCache(isUseCache); + policyEntry.setMaxBrowsePageSize(4096); policyMap.setDefaultEntry(policyEntry); brokerService.setDestinationPolicy(policyMap); return brokerService; http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java index 04d3620..0baf5c3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java @@ -34,6 +34,8 @@ import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; import org.junit.After; import org.junit.Before; @@ -55,13 +57,12 @@ public class AMQ4595Test { TransportConnector connector = broker.addConnector("vm://localhost"); broker.deleteAllMessages(); -// PolicyEntry policy = new PolicyEntry(); -// policy.setQueue(">"); -// policy.setMaxAuditDepth(16384); -// policy.setCursorMemoryHighWaterMark(95); // More breathing room. -// PolicyMap pMap = new PolicyMap(); -// pMap.setDefaultEntry(policy); -// broker.setDestinationPolicy(pMap); + //PolicyMap pMap = new PolicyMap(); + //PolicyEntry policyEntry = new PolicyEntry(); + //policyEntry.setMaxBrowsePageSize(10000); + //pMap.put(new ActiveMQQueue(">"), policyEntry); + // when no policy match, browserSub has maxMessages==0 + //broker.setDestinationPolicy(pMap); broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024); broker.start(); http://git-wip-us.apache.org/repos/asf/activemq/blob/52dc82b0/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLimitTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLimitTest.java new file mode 100644 index 0000000..15df78c --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueBrowsingLimitTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import java.io.IOException; +import java.net.URI; +import java.util.Enumeration; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; + +public class QueueBrowsingLimitTest { + + private static final Logger LOG = LoggerFactory.getLogger(QueueBrowsingLimitTest.class); + + private BrokerService broker; + private URI connectUri; + private ActiveMQConnectionFactory factory; + private final int browserLimit = 300; + + + @Before + public void startBroker() throws Exception { + broker = createBroker(); + TransportConnector connector = broker.addConnector("tcp://0.0.0.0:0"); + broker.deleteAllMessages(); + broker.start(); + broker.waitUntilStarted(); + + PolicyEntry policy = new PolicyEntry(); + policy.setMaxBrowsePageSize(browserLimit); + broker.setDestinationPolicy(new PolicyMap()); + broker.getDestinationPolicy().setDefaultEntry(policy); + + connectUri = connector.getConnectUri(); + factory = new ActiveMQConnectionFactory(connectUri); + + } + + public BrokerService createBroker() throws IOException { + return new BrokerService(); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Test + public void testBrowsingLimited() throws Exception { + + int messageToSend = 470; + + ActiveMQQueue queue = new ActiveMQQueue("TEST"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + + String data = ""; + for( int i=0; i < 1024*2; i++ ) { + data += "x"; + } + + for( int i=0; i < messageToSend; i++ ) { + producer.send(session.createTextMessage(data)); + } + + QueueBrowser browser = session.createBrowser(queue); + Enumeration<?> enumeration = browser.getEnumeration(); + int received = 0; + while (enumeration.hasMoreElements()) { + Message m = (Message) enumeration.nextElement(); + received++; + LOG.info("Browsed message " + received + ": " + m.getJMSMessageID()); + } + + browser.close(); + + assertEquals(browserLimit, received); + } +}
