Repository: activemq Updated Branches: refs/heads/master e91f5c806 -> 8493f6b4d
AMQ-6459 - store based % usage needs to poll the store for the current value on each access Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8493f6b4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8493f6b4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8493f6b4 Branch: refs/heads/master Commit: 8493f6b4d79146509e554e2b640f486d79f40bbb Parents: e91f5c8 Author: gtully <[email protected]> Authored: Fri Oct 7 14:31:52 2016 +0100 Committer: gtully <[email protected]> Committed: Fri Oct 7 14:31:52 2016 +0100 ---------------------------------------------------------------------- .../org/apache/activemq/usage/TempUsage.java | 13 ++ .../org/apache/activemq/bugs/AMQ6459Test.java | 213 +++++++++++++++++++ 2 files changed, 226 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8493f6b4/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java b/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java index f068dbe..885afbe 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java +++ b/activemq-broker/src/main/java/org/apache/activemq/usage/TempUsage.java @@ -51,6 +51,19 @@ public class TempUsage extends PercentLimitUsage<TempUsage> { } @Override + public int getPercentUsage() { + if (store != null) { + usageLock.writeLock().lock(); + try { + percentUsage = caclPercentUsage(); + } finally { + usageLock.writeLock().unlock(); + } + } + return super.getPercentUsage(); + } + + @Override protected long retrieveUsage() { if (store == null) { return 0; http://git-wip-us.apache.org/repos/asf/activemq/blob/8493f6b4/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6459Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6459Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6459Test.java new file mode 100644 index 0000000..1734d6a --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6459Test.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.store.kahadb.plist.PListStoreImpl; +import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +/** + * Ensure the tempPercentUsage JMX attribute decreases after temp store usage is decreased + * + */ +public class AMQ6459Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ6459Test.class); + + private static final String DESTINATION = "testQ1"; + private static final int MESSAGES_TO_SEND = 4000; + + private String TRANSPORT_URL = "tcp://0.0.0.0:0"; + + BrokerService broker; + + @Before + public void createBroker() throws Exception { + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setUseJmx(true); + + PolicyMap policyMap = new PolicyMap(); + List<PolicyEntry> entries = new ArrayList<PolicyEntry>(); + PolicyEntry pe = new PolicyEntry(); + pe.setTopicPrefetch(50); + pe.setTopic(">"); + entries.add(pe); + policyMap.setPolicyEntries(entries); + broker.setDestinationPolicy(policyMap); + + SystemUsage systemUsage = broker.getSystemUsage(); + systemUsage.getTempUsage().setLimit(50 * 1024 * 1024); + + + systemUsage.getMemoryUsage().setLimit(800 * 1024); + + PListStoreImpl pListStore = (PListStoreImpl) broker.getTempDataStore(); + pListStore.setJournalMaxFileLength(24 * 1024); + pListStore.setCleanupInterval(2000); + + broker.addConnector(TRANSPORT_URL); + + broker.start(); + broker.waitUntilStarted(); + TRANSPORT_URL = broker.getTransportConnectorByScheme("tcp").getPublishableConnectString(); + + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + } + + @Test + public void testTempPercentUsageDecreases() throws Exception { + + //create a topic subscriber, but do not consume messages + MessageConsumer messageConsumer = createConsumer(); + + + //send X messages with with a sequence number number in the message property. + sendMessages(MESSAGES_TO_SEND); + + + final BrokerViewMBean brokerView = getBrokerView(broker); + + LOG.info("tempPercentageUsage is " + brokerView.getTempPercentUsage()); + + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("tempPercentageUsage now " + brokerView.getTempPercentUsage()); + return brokerView.getTempPercentUsage() > 50; + } + }); + + final int tempPercentUsageWithConsumer = brokerView.getTempPercentUsage(); + + //ensure the tempPercentageUsage is at a high number + assertTrue(" tempPercentageUsage ", (50 < tempPercentUsageWithConsumer)); + + //close the consumer, releasing the temp storage + messageConsumer.close(); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("tempPercentageUsage now (after consumer closed) " + brokerView.getTempPercentUsage()); + return tempPercentUsageWithConsumer > brokerView.getTempPercentUsage(); + } + }); + + assertTrue("tempPercentageUsage should be less after consumer has closed", + tempPercentUsageWithConsumer > brokerView.getTempPercentUsage()); + + + } + + private MessageConsumer createConsumer() throws Exception { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", TRANSPORT_URL); + Connection connection = connectionFactory.createConnection(); + + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(DESTINATION); + + MessageConsumer messageConsumer = session.createConsumer(destination); + return messageConsumer; + } + + + /* + Send X message with a sequence number held in "appID" + */ + private Long sendMessages(int messageCount) throws Exception { + + long numberOfMessageSent = 0; + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", TRANSPORT_URL); + + + Connection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + + final String blob = new String(new byte[4 * 1024]); + try { + + Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer jmsProducer = producerSession.createProducer(producerSession.createTopic(DESTINATION)); + + Message sendMessage = producerSession.createTextMessage(blob); + + for (int i = 0; i < messageCount; i++) { + + jmsProducer.send(sendMessage); + producerSession.commit(); + numberOfMessageSent++; + + } + + LOG.info(" Finished after producing : " + numberOfMessageSent); + return numberOfMessageSent; + + } catch (Exception ex) { + LOG.info("Exception received producing ", ex); + LOG.info("finishing after exception :" + numberOfMessageSent); + return numberOfMessageSent; + } finally { + if (connection != null) { + connection.close(); + } + } + + } + + + private BrokerViewMBean getBrokerView(BrokerService broker) throws Exception { + BrokerViewMBean brokerViewMBean = broker.getAdminView(); + return brokerViewMBean; + + } + +} +
