Updated Branches: refs/heads/trunk c387e842e -> a64976a37
https://issues.apache.org/jira/browse/AMQ-4930 - fix reference count and limit expriy/browse to memory + 10% Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a64976a3 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a64976a3 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a64976a3 Branch: refs/heads/trunk Commit: a64976a3774eeecc2830bdfc3bf70499f9cfccb1 Parents: c387e84 Author: gtully <[email protected]> Authored: Tue Dec 17 14:42:19 2013 +0000 Committer: gtully <[email protected]> Committed: Wed Dec 18 12:21:02 2013 +0000 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 66 ++++------ .../apache/activemq/broker/region/Topic.java | 3 +- .../cursors/FilePendingMessageCursor.java | 4 - .../org/apache/activemq/bugs/AMQ4930Test.java | 132 +++++++++++++++++++ .../activemq/bugs/TempStoreDataCleanupTest.java | 4 +- .../KahaDBFilePendingMessageCursorTest.java | 1 + 6 files changed, 163 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 29b65b2..9b32500 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1029,6 +1029,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { messages.stop(); } + for (MessageReference messageReference : pagedInMessages.values()) { + messageReference.decrementReferenceCount(); + } + pagedInMessages.clear(); + systemUsage.getMemoryUsage().removeUsageListener(this); if (memoryUsage != null) { memoryUsage.stop(); @@ -1145,7 +1150,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { public void doBrowse(List<Message> browseList, int max) { final ConnectionContext connectionContext = createConnectionContext(); try { - pageInMessages(true); + // allow some page in even if we are full and producers are blocked on pfc + pageInMessages(!memoryUsage.isFull(110)); List<MessageReference> toExpire = new ArrayList<MessageReference>(); pagedInPendingDispatchLock.writeLock().lock(); @@ -1156,6 +1162,8 @@ public class Queue extends BaseDestination implements Task, UsageListener { if (broker.isExpired(ref)) { LOG.debug("expiring from pagedInPending: {}", ref); messageExpired(connectionContext, ref); + } else { + ref.decrementReferenceCount(); } } } finally { @@ -1179,45 +1187,20 @@ public class Queue extends BaseDestination implements Task, UsageListener { } finally { pagedInMessagesLock.writeLock().unlock(); } + ref.decrementReferenceCount(); } } - if (browseList.size() < getMaxBrowsePageSize()) { - messagesLock.writeLock().lock(); - try { - try { - messages.reset(); - while (messages.hasNext() && browseList.size() < max) { - MessageReference node = messages.next(); - if (node.isExpired()) { - if (broker.isExpired(node)) { - LOG.debug("expiring from messages: {}", node); - messageExpired(connectionContext, createMessageReference(node.getMessage())); - } - messages.remove(); - } else { - messages.rollback(node.getMessageId()); - if (browseList.contains(node.getMessage()) == false) { - browseList.add(node.getMessage()); - } - } - node.decrementReferenceCount(); - } - } finally { - messages.release(); - } - } finally { - messagesLock.writeLock().unlock(); - } - } + // we need a store iterator to walk messages on disk, independent of the cursor which is tracking + // the next message batch } catch (Exception e) { LOG.error("Problem retrieving message for browse", e); } } - private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int maxBrowsePageSize, + private void addAll(Collection<? extends MessageReference> refs, List<Message> l, int max, List<MessageReference> toExpire) throws Exception { - for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < getMaxBrowsePageSize();) { + for (Iterator<? extends MessageReference> i = refs.iterator(); i.hasNext() && l.size() < max;) { QueueMessageReference ref = (QueueMessageReference) i.next(); if (ref.isExpired()) { toExpire.add(ref); @@ -1896,27 +1879,30 @@ public class Queue extends BaseDestination implements Task, UsageListener { PendingList resultList = null; int toPageIn = Math.min(getMaxPageSize(), messages.size()); - LOG.debug("{} toPageIn: {}, Inflight: {}, pagedInMessages.size {}, enqueueCount: {}, dequeueCount: {}", + int pagedInPendingSize = 0; + pagedInPendingDispatchLock.readLock().lock(); + try { + pagedInPendingSize = pagedInPendingDispatch.size(); + } finally { + pagedInPendingDispatchLock.readLock().unlock(); + } + + LOG.debug("{} toPageIn: {}, Inflight: {}, pagedInMessages.size {}, pagedInPendingDispatch.size {}, enqueueCount: {}, dequeueCount: {}, memUsage:{}", new Object[]{ destination.getPhysicalName(), toPageIn, destinationStatistics.getInflight().getCount(), pagedInMessages.size(), + pagedInPendingSize, destinationStatistics.getEnqueues().getCount(), - destinationStatistics.getDequeues().getCount() + destinationStatistics.getDequeues().getCount(), + getMemoryUsage().getUsage() }); if (isLazyDispatch() && !force) { // Only page in the minimum number of messages which can be // dispatched immediately. toPageIn = Math.min(getConsumerMessageCountBeforeFull(), toPageIn); } - int pagedInPendingSize = 0; - pagedInPendingDispatchLock.readLock().lock(); - try { - pagedInPendingSize = pagedInPendingDispatch.size(); - } finally { - pagedInPendingDispatchLock.readLock().unlock(); - } if (toPageIn > 0 && (force || (!consumers.isEmpty() && pagedInPendingSize < getMaxPageSize()))) { int count = 0; result = new ArrayList<QueueMessageReference>(toPageIn); http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 41eef60..ab0f8ce 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -340,7 +340,8 @@ public class Topic extends BaseDestination implements Task { if (warnOnProducerFlowControl) { warnOnProducerFlowControl = false; - LOG.info("{}, Usage Manager memory limit reached for {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", getActiveMQDestination().getQualifiedName()); + LOG.info("{}, Usage Manager memory limit reached {}. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info.", + getActiveMQDestination().getQualifiedName(), memoryUsage.getLimit()); } if (!context.isNetworkConnection() && systemUsage.isSendFailIfNoSpace()) { http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index b1767e3..2769e68 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -411,10 +411,6 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple return true; } - protected boolean isSpaceInMemoryList() { - return hasSpace() && isDiskListEmpty(); - } - protected synchronized void expireOldMessages() { if (!memoryList.isEmpty()) { for (Iterator<MessageReference> iterator = memoryList.iterator(); iterator.hasNext();) { http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java new file mode 100644 index 0000000..f75eae3 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4930Test extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(AMQ4930Test.class); + final int messageCount = 150; + final int messageSize = 1024*1024; + final ActiveMQQueue bigQueue = new ActiveMQQueue("BIG"); + BrokerService broker; + ActiveMQConnectionFactory factory; + + protected void configureBroker() throws Exception { + broker.setDeleteAllMessagesOnStartup(true); + broker.setAdvisorySupport(false); + broker.getSystemUsage().getMemoryUsage().setLimit(1*1024*1024); + + PolicyMap pMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + // disable expriy processing as this will call browse in parallel + policy.setExpireMessagesPeriod(0); + policy.setMaxPageSize(50); + policy.setMaxBrowsePageSize(50); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + } + + public void testBrowsePendingNonPersistent() throws Exception { + doTestBrowsePending(DeliveryMode.NON_PERSISTENT); + } + + public void testBrowsePendingPersistent() throws Exception { + doTestBrowsePending(DeliveryMode.PERSISTENT); + } + + public void doTestBrowsePending(int deliveryMode) throws Exception { + + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(bigQueue); + producer.setDeliveryMode(deliveryMode); + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(new byte[messageSize]); + + for (int i = 0; i < messageCount; i++) { + producer.send(bigQueue, bytesMessage); + LOG.info("Sent: " + i); + } + + final QueueViewMBean queueViewMBean = (QueueViewMBean) + broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], QueueViewMBean.class, false); + + LOG.info(queueViewMBean.getName() + " Size: " + queueViewMBean.getEnqueueCount()); + + connection.close(); + + assertFalse("Cache disabled on q", queueViewMBean.isCacheEnabled()); + + // ensure repeated browse does now blow mem + + final Queue underTest = (Queue) ((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(bigQueue); + + // do twice to attempt to pull in 2*maxBrowsePageSize which uses up the system memory limit + underTest.browse(); + underTest.browse(); + Runtime.getRuntime().gc(); + long free = Runtime.getRuntime().freeMemory()/1024; + LOG.info("free at start of check: " + free); + // check for memory growth + for (int i=0; i<10; i++) { + LOG.info("free: " + Runtime.getRuntime().freeMemory()/1024); + underTest.browse(); + Runtime.getRuntime().gc(); + Runtime.getRuntime().gc(); + assertTrue("No growth: " + Runtime.getRuntime().freeMemory()/1024, Runtime.getRuntime().freeMemory()/1024 >= (free - (free * 0.1))); + } + } + + + protected void setUp() throws Exception { + super.setUp(); + broker = new BrokerService(); + broker.setBrokerName("thisOne"); + configureBroker(); + broker.start(); + factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true"); + factory.setWatchTopicAdvisories(false); + + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + broker = null; + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java index 36dafaf..34df4a3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/TempStoreDataCleanupTest.java @@ -115,7 +115,7 @@ public class TempStoreDataCleanupTest { public void testIt() throws Exception { int startPercentage = broker.getAdminView().getMemoryPercentUsage(); - LOG.info("MemoryUseage at test start = " + startPercentage); + LOG.info("MemoryUsage at test start = " + startPercentage); for (int i = 0; i < 2; i++) { LOG.info("Started the test iteration: " + i + " using queueName = " + queueName); @@ -146,7 +146,7 @@ public class TempStoreDataCleanupTest { TimeUnit.SECONDS.sleep(2); } - LOG.info("MemoryUseage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage()); + LOG.info("MemoryUsage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage()); final PListStoreImpl pa = (PListStoreImpl) broker.getTempDataStore(); assertTrue("only one journal file should be left: " + pa.getJournal().getFileMap().size(), http://git-wip-us.apache.org/repos/asf/activemq/blob/a64976a3/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java index f0338ba..5a3b318 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/KahaDBFilePendingMessageCursorTest.java @@ -74,6 +74,7 @@ public class KahaDBFilePendingMessageCursorTest extends FilePendingMessageCursor while(underTest.hasNext()) { MessageReference ref = underTest.next(); underTest.remove(); + ref.decrementReferenceCount(); assertEquals("id is correct", receivedCount++, ref.getMessageId().getProducerSequenceId()); } assertEquals("got all messages back", receivedCount, numMessages);
