http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java new file mode 100644 index 0000000..1c34982 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4582Test.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.IOException; + +import javax.jms.Connection; +import javax.jms.Session; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.ConsumerThread; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4582Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4582Test.class); + + BrokerService broker; + Connection connection; + Session session; + + public static final String KEYSTORE_TYPE = "jks"; + public static final String PASSWORD = "password"; + public static final String SERVER_KEYSTORE = "src/test/resources/server.keystore"; + public static final String TRUST_KEYSTORE = "src/test/resources/client.keystore"; + + public static final int PRODUCER_COUNT = 10; + public static final int CONSUMER_COUNT = 10; + public static final int MESSAGE_COUNT = 1000; + + final ConsumerThread[] consumers = new ConsumerThread[CONSUMER_COUNT]; + + @Before + public void setUp() throws Exception { + System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE); + System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD); + System.setProperty("javax.net.ssl.trustStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStore", SERVER_KEYSTORE); + System.setProperty("javax.net.ssl.keyStoreType", KEYSTORE_TYPE); + System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + try { + broker.stop(); + } catch(Exception e) {} + } + } + + @Rule public ExpectedException thrown = ExpectedException.none(); + @Test + public void simpleTest() throws Exception { + thrown.expect(IOException.class); + thrown.expectMessage("enabledCipherSuites=BADSUITE"); + + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + try { + broker.addConnector( + "ssl://localhost:0?transport.needClientAuth=true&transport.enabledCipherSuites=BADSUITE"); + broker.start(); + broker.waitUntilStarted(); + } catch (Exception e) { + LOG.info("BrokerService threw:", e); + throw e; + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java new file mode 100644 index 0000000..507e52e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.net.URI; +import java.util.Date; +import java.util.Enumeration; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.QueueBrowser; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; + +public class AMQ4595Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4595Test.class); + + private BrokerService broker; + private URI connectUri; + private ActiveMQConnectionFactory factory; + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + TransportConnector connector = broker.addConnector("vm://localhost"); + broker.deleteAllMessages(); + + //PolicyMap pMap = new PolicyMap(); + //PolicyEntry policyEntry = new PolicyEntry(); + //policyEntry.setMaxBrowsePageSize(10000); + //pMap.put(new ActiveMQQueue(">"), policyEntry); + // when no policy match, browserSub has maxMessages==0 + //broker.setDestinationPolicy(pMap); + + broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024); + broker.start(); + broker.waitUntilStarted(); + connectUri = connector.getConnectUri(); + factory = new ActiveMQConnectionFactory(connectUri); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Test(timeout=120000) + public void testBrowsingSmallBatch() throws JMSException { + doTestBrowsing(100); + } + + @Test(timeout=160000) + public void testBrowsingMediumBatch() throws JMSException { + doTestBrowsing(1000); + } + + @Test(timeout=300000) + public void testBrowsingLargeBatch() throws JMSException { + doTestBrowsing(10000); + } + + private void doTestBrowsing(int messageToSend) throws JMSException { + ActiveMQQueue queue = new ActiveMQQueue("TEST"); + + // Send the messages to the Queue. + ActiveMQConnection producerConnection = (ActiveMQConnection) factory.createConnection(); + producerConnection.setUseAsyncSend(true); + producerConnection.start(); + Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + for (int i = 1; i <= messageToSend; i++) { + String msgStr = provideMessageText(i, 8192); + producer.send(producerSession.createTextMessage(msgStr)); + if ((i % 1000) == 0) { + LOG.info("P&C: {}", msgStr.substring(0, 100)); + } + } + producerConnection.close(); + + LOG.info("Mem usage after producer done: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%"); + + // Browse the queue. + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration<?> enumeration = browser.getEnumeration(); + int browsed = 0; + while (enumeration.hasMoreElements()) { + TextMessage m = (TextMessage) enumeration.nextElement(); + browsed++; + if ((browsed % 1000) == 0) { + LOG.info("B[{}]: {}", browsed, m.getText().substring(0, 100)); + } + } + browser.close(); + session.close(); + connection.close(); + + LOG.info("Mem usage after browser closed: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%"); + + // The number of messages browsed should be equal to the number of messages sent. + assertEquals(messageToSend, browsed); + + browser.close(); + } + + public String provideMessageText(int messageNumber, int messageSize) { + StringBuilder buf = new StringBuilder(); + buf.append("Message: "); + if (messageNumber > 0) { + buf.append(messageNumber); + } + buf.append(" sent at: ").append(new Date()); + + if (buf.length() > messageSize) { + return buf.substring(0, messageSize); + } + for (int i = buf.length(); i < messageSize; i++) { + buf.append(' '); + } + return buf.toString(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java new file mode 100644 index 0000000..265b692 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.net.URI; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Map.Entry; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import junit.framework.Test; +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.ManagementContext; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements UncaughtExceptionHandler { + private static final Logger LOG = LoggerFactory.getLogger(AMQ4607Test.class); + + public static final int BROKER_COUNT = 3; + public static final int CONSUMER_COUNT = 1; + public static final int MESSAGE_COUNT = 0; + public static final boolean CONDUIT = true; + public static final int TIMEOUT = 20000; + + public boolean duplex = true; + protected Map<String, MessageConsumer> consumerMap; + Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>(); + + private void assertNoUnhandeledExceptions() { + for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) { + LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue()); + } + assertTrue("There are no unhandelled exceptions, see: log for detail on: " + unhandeledExceptions, + unhandeledExceptions.isEmpty()); + } + + public NetworkConnector bridge(String from, String to) throws Exception { + NetworkConnector networkConnector = bridgeBrokers(from, to, true, -1, CONDUIT); + networkConnector.setSuppressDuplicateQueueSubscriptions(true); + networkConnector.setDecreaseNetworkConsumerPriority(true); + networkConnector.setConsumerTTL(1); + networkConnector.setDuplex(duplex); + return networkConnector; + } + + public static Test suite() { + return suite(AMQ4607Test.class); + } + + public void initCombos() { + addCombinationValues("duplex", new Boolean[]{Boolean.TRUE, Boolean.FALSE}); + } + + public void testMigratingConsumer() throws Exception { + bridge("Broker0", "Broker1"); + if (!duplex) bridge("Broker1", "Broker0"); + + bridge("Broker1", "Broker2"); + if (!duplex) bridge("Broker2", "Broker1"); + + bridge("Broker0", "Broker2"); + if (!duplex) bridge("Broker2", "Broker0"); + + startAllBrokers(); + this.waitForBridgeFormation(); + + Destination dest = createDestination("TEST.FOO", false); + sendMessages("Broker0", dest, 1); + + for (int i=0; i< BROKER_COUNT; i++) { + MessageConsumer messageConsumer = createConsumer("Broker" + i, dest, "DoNotConsume = 'true'"); + + for (int J = 0; J < BROKER_COUNT; J++) { + assertExactConsumersConnect("Broker" + J, dest, CONSUMER_COUNT, TIMEOUT); + } + + assertNoUnhandeledExceptions(); + + assertExactMessageCount("Broker" + i, dest, 1, TIMEOUT); + + messageConsumer.close(); + LOG.info("Check for no consumers.."); + for (int J = 0; J < BROKER_COUNT; J++) { + assertExactConsumersConnect("Broker" + J, dest, 0, TIMEOUT); + } + } + + // now consume the message + final String brokerId = "Broker2"; + MessageConsumer messageConsumer = createConsumer(brokerId, dest); + assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokers.get(brokerId).allMessages.getMessageIds().size() == 1; + } + })); + messageConsumer.close(); + + } + + public void testMigratingConsumerFullCircle() throws Exception { + bridge("Broker0", "Broker1"); + if (!duplex) bridge("Broker1", "Broker0"); + + bridge("Broker1", "Broker2"); + if (!duplex) bridge("Broker2", "Broker1"); + + bridge("Broker0", "Broker2"); + if (!duplex) bridge("Broker2", "Broker0"); + + // allow full loop, immediate replay back to 0 from 2 + ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory(); + conditionalNetworkBridgeFilterFactory.setReplayDelay(0); + conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); + brokers.get("Broker2").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory); + startAllBrokers(); + this.waitForBridgeFormation(); + + Destination dest = createDestination("TEST.FOO", false); + + sendMessages("Broker0", dest, 1); + + for (int i=0; i< BROKER_COUNT; i++) { + MessageConsumer messageConsumer = createConsumer("Broker" + i, dest, "DoNotConsume = 'true'"); + + for (int J = 0; J < BROKER_COUNT; J++) { + assertExactConsumersConnect("Broker" + J, dest, CONSUMER_COUNT, TIMEOUT); + } + + assertNoUnhandeledExceptions(); + + // validate the message has been forwarded + assertExactMessageCount("Broker" + i, dest, 1, TIMEOUT); + + messageConsumer.close(); + LOG.info("Check for no consumers.."); + for (int J = 0; J < BROKER_COUNT; J++) { + assertExactConsumersConnect("Broker" + J, dest, 0, TIMEOUT); + } + } + + // now consume the message from the origin + LOG.info("Consume from origin..."); + final String brokerId = "Broker0"; + MessageConsumer messageConsumer = createConsumer(brokerId, dest); + assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokers.get(brokerId).allMessages.getMessageIds().size() == 1; + } + })); + messageConsumer.close(); + + } + + protected void assertExactMessageCount(final String brokerName, Destination destination, final int count, long timeout) throws Exception { + ManagementContext context = brokers.get(brokerName).broker.getManagementContext(); + final QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false); + assertTrue("Excepected queue depth: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + long currentCount = queueViewMBean.getQueueSize(); + LOG.info("On " + brokerName + " current queue size for " + queueViewMBean + ", " + currentCount); + if (count != currentCount) { + LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions())); + } + return currentCount == count; + } + }, timeout)); + } + + protected void assertExactConsumersConnect(final String brokerName, Destination destination, final int count, long timeout) throws Exception { + final ManagementContext context = brokers.get(brokerName).broker.getManagementContext(); + assertTrue("Excepected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + try { + QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false); + long currentCount = queueViewMBean.getConsumerCount(); + LOG.info("On " + brokerName + " current consumer count for " + queueViewMBean + ", " + currentCount); + if (count != currentCount) { + LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions())); + } + return currentCount == count; + } catch (Exception e) { + LOG.warn("Unexpected: " + e, e); + return false; + } + } + }, timeout)); + } + + public void setUp() throws Exception { + super.setUp(); + + unhandeledExceptions.clear(); + Thread.setDefaultUncaughtExceptionHandler(this); + + // Setup n brokers + for (int i = 0; i < BROKER_COUNT; i++) { + createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true")); + } + + consumerMap = new LinkedHashMap<String, MessageConsumer>(); + } + + @Override + protected void configureBroker(BrokerService brokerService) { + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setExpireMessagesPeriod(0); + PolicyMap policyMap = new PolicyMap(); + policyMap.setDefaultEntry(policyEntry); + brokerService.setDestinationPolicy(policyMap); + } + + public void uncaughtException(Thread t, Throwable e) { + synchronized(unhandeledExceptions) { + unhandeledExceptions.put(t,e); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java new file mode 100644 index 0000000..014d86a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4636Test.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.concurrent.CountDownLatch; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.store.jdbc.DataSourceServiceSupport; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.jdbc.LeaseDatabaseLocker; +import org.apache.activemq.store.jdbc.TransactionContext; +import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.LeaseLockerIOExceptionHandler; +import org.apache.derby.jdbc.EmbeddedDataSource; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.junit.Assert.fail; + +/** + * Testing how the broker reacts when a SQL Exception is thrown from + * org.apache.activemq.store.jdbc.TransactionContext.executeBatch(). + * <p/> + * see https://issues.apache.org/jira/browse/AMQ-4636 + */ +public class AMQ4636Test { + + private static final String MY_TEST_TOPIC = "MY_TEST_TOPIC"; + private static final Logger LOG = LoggerFactory + .getLogger(AMQ4636Test.class); + private String transportUrl = "tcp://0.0.0.0:0"; + private BrokerService broker; + EmbeddedDataSource embeddedDataSource; + CountDownLatch throwSQLException = new CountDownLatch(0); + + @Before + public void startBroker() throws Exception { + broker = createBroker(); + broker.deleteAllMessages(); + broker.start(); + broker.waitUntilStarted(); + LOG.info("Broker started..."); + } + + @After + public void stopBroker() throws Exception { + if (broker != null) { + LOG.info("Stopping broker..."); + broker.stop(); + broker.waitUntilStopped(); + } + try { + if (embeddedDataSource != null) { + // ref http://svn.apache.org/viewvc/db/derby/code/trunk/java/testing/org/apache/derbyTesting/junit/JDBCDataSource.java?view=markup + embeddedDataSource.setShutdownDatabase("shutdown"); + embeddedDataSource.getConnection(); + } + } catch (Exception ignored) { + } finally { + embeddedDataSource.setShutdownDatabase(null); + } + } + + protected BrokerService createBroker() throws Exception { + + embeddedDataSource = (EmbeddedDataSource) DataSourceServiceSupport.createDataSource(IOHelper.getDefaultDataDirectory()); + embeddedDataSource.setCreateDatabase("create"); + embeddedDataSource.getConnection().close(); + + //wire in a TestTransactionContext (wrapper to TransactionContext) that has an executeBatch() + // method that can be configured to throw a SQL exception on demand + JDBCPersistenceAdapter jdbc = new TestJDBCPersistenceAdapter(); + jdbc.setDataSource(embeddedDataSource); + + jdbc.setLockKeepAlivePeriod(1000l); + LeaseDatabaseLocker leaseDatabaseLocker = new LeaseDatabaseLocker(); + leaseDatabaseLocker.setLockAcquireSleepInterval(2000l); + jdbc.setLocker(leaseDatabaseLocker); + + broker = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(0); + policyMap.setDefaultEntry(defaultEntry); + broker.setDestinationPolicy(policyMap); + broker.setPersistenceAdapter(jdbc); + + broker.setIoExceptionHandler(new LeaseLockerIOExceptionHandler()); + + transportUrl = broker.addConnector(transportUrl).getPublishableConnectString(); + return broker; + } + + /** + * adding a TestTransactionContext (wrapper to TransactionContext) so an SQLException is triggered + * during TransactionContext.executeBatch() when called in the broker. + * <p/> + * Expectation: SQLException triggers a connection shutdown and failover should kick and try to redeliver the + * message. SQLException should NOT be returned to client + */ + @Test + public void testProducerWithDBShutdown() throws Exception { + + // failover but timeout in 1 seconds so the test does not hang + String failoverTransportURL = "failover:(" + transportUrl + + ")?timeout=1000"; + + this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL); + + this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, false, false); + + } + + @Test + public void testTransactedProducerCommitWithDBShutdown() throws Exception { + + // failover but timeout in 1 seconds so the test does not hang + String failoverTransportURL = "failover:(" + transportUrl + + ")?timeout=1000"; + + this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL); + + try { + this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, true); + fail("Expect rollback after failover - inddoubt commit"); + } catch (javax.jms.TransactionRolledBackException expectedInDoubt) { + LOG.info("Got rollback after failover failed commit", expectedInDoubt); + } + } + + @Test + public void testTransactedProducerRollbackWithDBShutdown() throws Exception { + + // failover but timeout in 1 seconds so the test does not hang + String failoverTransportURL = "failover:(" + transportUrl + + ")?timeout=1000"; + + this.createDurableConsumer(MY_TEST_TOPIC, failoverTransportURL); + + this.sendMessage(MY_TEST_TOPIC, failoverTransportURL, true, false); + } + + public void createDurableConsumer(String topic, + String transportURL) throws JMSException { + Connection connection = null; + LOG.info("*** createDurableConsumer() called ..."); + + try { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + transportURL); + + connection = factory.createConnection(); + connection.setClientID("myconn1"); + Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(topic); + + TopicSubscriber topicSubscriber = session.createDurableSubscriber( + (Topic) destination, "MySub1"); + } finally { + if (connection != null) { + connection.close(); + } + } + } + + public void sendMessage(String topic, String transportURL, boolean transacted, boolean commit) + throws JMSException { + Connection connection = null; + + try { + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + transportURL); + + connection = factory.createConnection(); + Session session = connection.createSession(transacted, + transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic(topic); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + Message m = session.createTextMessage("testMessage"); + LOG.info("*** send message to broker..."); + + // trigger SQL exception in transactionContext + throwSQLException = new CountDownLatch(1); + producer.send(m); + + if (transacted) { + if (commit) { + session.commit(); + } else { + session.rollback(); + } + } + + LOG.info("*** Finished send message to broker"); + + } finally { + if (connection != null) { + connection.close(); + } + } + } + + /* + * Mock classes used for testing + */ + + public class TestJDBCPersistenceAdapter extends JDBCPersistenceAdapter { + + public TransactionContext getTransactionContext() throws IOException { + return new TestTransactionContext(this); + } + } + + public class TestTransactionContext extends TransactionContext { + + public TestTransactionContext( + JDBCPersistenceAdapter jdbcPersistenceAdapter) + throws IOException { + super(jdbcPersistenceAdapter); + } + + @Override + public void executeBatch() throws SQLException { + if (throwSQLException.getCount() > 0) { + // only throw exception once + throwSQLException.countDown(); + throw new SQLException("TEST SQL EXCEPTION"); + } + super.executeBatch(); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java new file mode 100644 index 0000000..fcdf23e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4656Test.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.Arrays; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; +import org.apache.activemq.broker.region.policy.FilePendingDurableSubscriberMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(value = Parameterized.class) +public class AMQ4656Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4656Test.class); + private static BrokerService brokerService; + private static String BROKER_ADDRESS = "tcp://localhost:0"; + + private String connectionUri; + + @Parameterized.Parameter + public PendingDurableSubscriberMessageStoragePolicy pendingDurableSubPolicy; + + @Parameterized.Parameters(name="{0}") + public static Iterable<Object[]> getTestParameters() { + return Arrays.asList(new Object[][]{{new FilePendingDurableSubscriberMessageStoragePolicy()},{new StorePendingDurableSubscriberMessageStoragePolicy()}}); + } + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setPendingDurableSubscriberPolicy(pendingDurableSubPolicy); + policyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(policyMap); + brokerService.setPersistent(false); + brokerService.setUseJmx(true); + brokerService.setDeleteAllMessagesOnStartup(true); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void testDurableConsumerEnqueueCountWithZeroPrefetch() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); + + Connection connection = connectionFactory.createConnection(); + connection.setClientID(getClass().getName()); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic("DurableTopic"); + + MessageConsumer consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub"); + + BrokerView view = brokerService.getAdminView(); + view.getDurableTopicSubscribers(); + + ObjectName subName = view.getDurableTopicSubscribers()[0]; + + DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean) + brokerService.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true); + + assertEquals(0, sub.getEnqueueCounter()); + assertEquals(0, sub.getDequeueCounter()); + assertEquals(0, sub.getPendingQueueSize()); + assertEquals(0, sub.getDispatchedCounter()); + assertEquals(0, sub.getDispatchedQueueSize()); + + consumer.close(); + + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < 20; i++) { + producer.send(session.createMessage()); + } + producer.close(); + + consumer = session.createDurableSubscriber((Topic) destination, "EnqueueSub"); + + Thread.sleep(1000); + + assertEquals(20, sub.getEnqueueCounter()); + assertEquals(0, sub.getDequeueCounter()); + assertEquals(0, sub.getPendingQueueSize()); + assertEquals(20, sub.getDispatchedCounter()); + assertEquals(20, sub.getDispatchedQueueSize()); + + LOG.info("Pending Queue Size with no receives: {}", sub.getPendingQueueSize()); + + assertNotNull(consumer.receive(1000)); + assertNotNull(consumer.receive(1000)); + + consumer.close(); + + Thread.sleep(2000); + + LOG.info("Pending Queue Size with two receives: {}", sub.getPendingQueueSize()); + + assertEquals(20, sub.getEnqueueCounter()); + assertEquals(2, sub.getDequeueCounter()); + assertEquals(18, sub.getPendingQueueSize()); + assertEquals(20, sub.getDispatchedCounter()); + assertEquals(0, sub.getDispatchedQueueSize()); + + session.close(); + connection.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java new file mode 100644 index 0000000..b69ab47 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4671Test.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.fail; + +import javax.jms.Connection; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4671Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4671Test.class); + private static BrokerService brokerService; + private static String BROKER_ADDRESS = "tcp://localhost:0"; + + private String connectionUri; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(true); + brokerService.setDeleteAllMessagesOnStartup(true); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + connectionUri = connectionUri + "?trace=true"; + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void testNonDurableSubscriberInvalidUnsubscribe() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); + + Connection connection = connectionFactory.createConnection(); + connection.setClientID(getClass().getName()); + connection.start(); + + try { + Session ts = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + ts.unsubscribe("invalid-subscription-name"); + fail("this should fail"); + } catch (javax.jms.InvalidDestinationException e) { + LOG.info("Test caught correct invalid destination exception"); + } + } finally { + if (connection != null) { + connection.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java new file mode 100644 index 0000000..fd80690 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4677Test.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FilenameFilter; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.*; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.leveldb.LevelDBStore; +import org.apache.activemq.leveldb.LevelDBStoreViewMBean; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4677Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4677Test.class); + private static BrokerService brokerService; + + @Rule public TestName name = new TestName(); + + private File dataDirFile; + + @Before + public void setUp() throws Exception { + + dataDirFile = new File("target/LevelDBCleanupTest"); + + brokerService = new BrokerService(); + brokerService.setBrokerName("LevelDBBroker"); + brokerService.setPersistent(true); + brokerService.setUseJmx(true); + brokerService.setAdvisorySupport(false); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setDataDirectoryFile(dataDirFile); + + LevelDBStore persistenceFactory = new LevelDBStore(); + persistenceFactory.setDirectory(dataDirFile); + brokerService.setPersistenceAdapter(persistenceFactory); + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void testSendAndReceiveAllMessages() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://LevelDBBroker"); + + Connection connection = connectionFactory.createConnection(); + connection.setClientID(getClass().getName()); + connection.start(); + + final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.toString()); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + final LevelDBStoreViewMBean levelDBView = getLevelDBStoreMBean(); + assertNotNull(levelDBView); + levelDBView.compact(); + + final int SIZE = 6 * 1024 * 5; + final int MSG_COUNT = 60000; + final CountDownLatch done = new CountDownLatch(MSG_COUNT); + + byte buffer[] = new byte[SIZE]; + for (int i = 0; i < SIZE; ++i) { + buffer[i] = (byte) 128; + } + + for (int i = 0; i < MSG_COUNT; ++i) { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(buffer); + producer.send(message); + + if ((i % 1000) == 0) { + LOG.info("Sent message #{}", i); + session.commit(); + } + } + + session.commit(); + + LOG.info("Finished sending all messages."); + + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + if ((done.getCount() % 1000) == 0) { + try { + LOG.info("Received message #{}", MSG_COUNT - done.getCount()); + session.commit(); + } catch (JMSException e) { + } + } + done.countDown(); + } + }); + + done.await(15, TimeUnit.MINUTES); + session.commit(); + LOG.info("Finished receiving all messages."); + + assertTrue("Should < 3 logfiles left.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + levelDBView.compact(); + return countLogFiles() < 3; + } + }, TimeUnit.MINUTES.toMillis(5), (int)TimeUnit.SECONDS.toMillis(30))); + + levelDBView.compact(); + LOG.info("Current number of logs {}", countLogFiles()); + } + + protected long countLogFiles() { + String[] logFiles = dataDirFile.list(new FilenameFilter() { + + @Override + public boolean accept(File dir, String name) { + if (name.endsWith("log")) { + return true; + } + return false; + } + }); + + LOG.info("Current number of logs {}", logFiles.length); + return logFiles.length; + } + + protected LevelDBStoreViewMBean getLevelDBStoreMBean() throws Exception { + ObjectName levelDbViewMBeanQuery = new ObjectName( + "org.apache.activemq:type=Broker,brokerName=LevelDBBroker,service=PersistenceAdapter,instanceName=LevelDB*"); + + Set<ObjectName> names = brokerService.getManagementContext().queryNames(null, levelDbViewMBeanQuery); + if (names.isEmpty() || names.size() > 1) { + throw new java.lang.IllegalStateException("Can't find levelDB store name."); + } + + LevelDBStoreViewMBean proxy = (LevelDBStoreViewMBean) brokerService.getManagementContext() + .newProxyInstance(names.iterator().next(), LevelDBStoreViewMBean.class, true); + return proxy; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java new file mode 100644 index 0000000..a347279 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4853Test.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisoryBroker; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionId; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.SessionId; +import org.apache.activemq.command.SessionInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4853Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4853Test.class); + private static BrokerService brokerService; + private static final String BROKER_ADDRESS = "tcp://localhost:0"; + private static final ActiveMQQueue DESTINATION = new ActiveMQQueue("TEST.QUEUE"); + private CountDownLatch cycleDoneLatch; + + private String connectionUri; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(false); + brokerService.setAdvisorySupport(true); + brokerService.setDeleteAllMessagesOnStartup(true); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + /** + * Test to shows the performance of the removing consumers while other stay active. + * @throws Exception + */ + @Ignore + @Test + public void test() throws Exception { + + // Create a stable set of consumers to fill in the advisory broker's consumer list. + ArrayList<Consumer> fixedConsumers = new ArrayList<Consumer>(100); + for (int i = 0; i < 200; ++i) { + fixedConsumers.add(new Consumer()); + } + + // Create a set of consumers that comes online for a short time and then + // goes offline again. Cycles will repeat as each batch completes + final int fixedDelayConsumers = 300; + final int fixedDelayCycles = 25; + + final CountDownLatch fixedDelayCycleLatch = new CountDownLatch(fixedDelayCycles); + + // Update so done method can track state. + cycleDoneLatch = fixedDelayCycleLatch; + + CyclicBarrier barrier = new CyclicBarrier(fixedDelayConsumers, new Runnable() { + @Override + public void run() { + LOG.info("Fixed delay consumers cycle {} completed.", fixedDelayCycleLatch.getCount()); + fixedDelayCycleLatch.countDown(); + } + }); + + for (int i = 0; i < fixedDelayConsumers; ++i) { + new Thread(new FixedDelyConsumer(barrier)).start(); + } + + fixedDelayCycleLatch.await(10, TimeUnit.MINUTES); + + // Clean up. + + for (Consumer consumer : fixedConsumers) { + consumer.close(); + } + fixedConsumers.clear(); + } + + private ConnectionInfo createConnectionInfo() { + ConnectionId id = new ConnectionId(); + id.setValue("ID:123456789:0:1"); + + ConnectionInfo info = new ConnectionInfo(); + info.setConnectionId(id); + return info; + } + + private SessionInfo createSessionInfo(ConnectionInfo connection) { + SessionId id = new SessionId(connection.getConnectionId(), 1); + + SessionInfo info = new SessionInfo(); + info.setSessionId(id); + + return info; + } + + public ConsumerInfo createConsumerInfo(SessionInfo session, int value, ActiveMQDestination destination) { + ConsumerId id = new ConsumerId(); + id.setConnectionId(session.getSessionId().getConnectionId()); + id.setSessionId(1); + id.setValue(value); + + ConsumerInfo info = new ConsumerInfo(); + info.setConsumerId(id); + info.setDestination(destination); + return info; + } + + /** + * Test to shows the performance impact of removing consumers in various scenarios. + * @throws Exception + */ + @Ignore + @Test + public void testPerformanceOfRemovals() throws Exception { + // setup + AdvisoryBroker testObj = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); + ActiveMQDestination destination = new ActiveMQQueue("foo"); + ConnectionInfo connectionInfo = createConnectionInfo(); + ConnectionContext connectionContext = new ConnectionContext(connectionInfo); + connectionContext.setBroker(brokerService.getBroker()); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + + long start = System.currentTimeMillis(); + + for (int i = 0; i < 200; ++i) { + + for (int j = 1; j <= 500; j++) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); + testObj.addConsumer(connectionContext, consumerInfo); + } + + for (int j = 500; j > 0; j--) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); + testObj.removeConsumer(connectionContext, consumerInfo); + } + + for (int j = 1; j <= 500; j++) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); + testObj.addConsumer(connectionContext, consumerInfo); + } + + for (int j = 1; j <= 500; j++) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); + testObj.removeConsumer(connectionContext, consumerInfo); + } + } + + long finish = System.currentTimeMillis(); + + long totalTime = finish - start; + + LOG.info("Total test time: {} seconds", TimeUnit.MILLISECONDS.toSeconds(totalTime)); + + assertEquals(0, testObj.getAdvisoryConsumers().size()); + } + + @Test + public void testEqualsNeeded() throws Exception { + // setup + AdvisoryBroker testObj = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); + ActiveMQDestination destination = new ActiveMQQueue("foo"); + ConnectionInfo connectionInfo = createConnectionInfo(); + ConnectionContext connectionContext = new ConnectionContext(connectionInfo); + connectionContext.setBroker(brokerService.getBroker()); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + + for (int j = 1; j <= 5; j++) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); + testObj.addConsumer(connectionContext, consumerInfo); + } + + for (int j = 1; j <= 5; j++) { + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, j, destination); + testObj.removeConsumer(connectionContext, consumerInfo); + } + + assertEquals(0, testObj.getAdvisoryConsumers().size()); + } + + private boolean done() { + if (cycleDoneLatch == null) { + return true; + } + return cycleDoneLatch.getCount() == 0; + } + + class Consumer implements MessageListener { + + Connection connection; + Session session; + Destination destination; + MessageConsumer consumer; + + Consumer() throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + connection = factory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumer = session.createConsumer(DESTINATION); + consumer.setMessageListener(this); + connection.start(); + } + + @Override + public void onMessage(Message message) { + } + + public void close() { + try { + connection.close(); + } catch(Exception e) { + } + + connection = null; + session = null; + consumer = null; + } + } + + class FixedDelyConsumer implements Runnable { + + private final CyclicBarrier barrier; + private final int sleepInterval; + + public FixedDelyConsumer(CyclicBarrier barrier) { + this.barrier = barrier; + this.sleepInterval = 1000; + } + + public FixedDelyConsumer(CyclicBarrier barrier, int sleepInterval) { + this.barrier = barrier; + this.sleepInterval = sleepInterval; + } + + @Override + public void run() { + while (!done()) { + + try { + Consumer consumer = new Consumer(); + TimeUnit.MILLISECONDS.sleep(sleepInterval); + consumer.close(); + barrier.await(); + } catch (Exception ex) { + return; + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java new file mode 100644 index 0000000..cf33ece --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.StreamMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4887Test { + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4887Test.class); + private static final Integer ITERATIONS = 10; + + @Rule + public TestName name = new TestName(); + + @Test + public void testBytesMessageSetPropertyBeforeCopy() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + doTestBytesMessageSetPropertyBeforeCopy(connection); + } + + @Test + public void testBytesMessageSetPropertyBeforeCopyCompressed() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + connectionFactory.setUseCompression(true); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + doTestBytesMessageSetPropertyBeforeCopy(connection); + } + + public void doTestBytesMessageSetPropertyBeforeCopy(Connection connection) throws Exception { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.toString()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + BytesMessage message = session.createBytesMessage(); + + for (int i=0; i < ITERATIONS; i++) { + + long sendTime = System.currentTimeMillis(); + message.setLongProperty("sendTime", sendTime); + producer.send(message); + + LOG.debug("Receiving message " + i); + Message receivedMessage = consumer.receive(5000); + assertNotNull("On message " + i, receivedMessage); + assertTrue("On message " + i, receivedMessage instanceof BytesMessage); + + BytesMessage receivedBytesMessage = (BytesMessage) receivedMessage; + + int numElements = 0; + try { + while (true) { + receivedBytesMessage.readBoolean(); + numElements++; + } + } catch (Exception ex) { + } + + LOG.info("Iteration [{}]: Received Message contained {} boolean values.", i, numElements); + assertEquals(i, numElements); + + long receivedSendTime = receivedBytesMessage.getLongProperty("sendTime"); + assertEquals("On message " + i, receivedSendTime, sendTime); + + // Add a new bool value on each iteration. + message.writeBoolean(true); + } + } + + @Test + public void testStreamMessageSetPropertyBeforeCopy() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + doTestStreamMessageSetPropertyBeforeCopy(connection); + } + + @Test + public void testStreamMessageSetPropertyBeforeCopyCompressed() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + connectionFactory.setUseCompression(true); + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + doTestStreamMessageSetPropertyBeforeCopy(connection); + } + + public void doTestStreamMessageSetPropertyBeforeCopy(Connection connection) throws Exception { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(name.toString()); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + StreamMessage message = session.createStreamMessage(); + + for (int i=0; i < ITERATIONS; i++) { + + long sendTime = System.currentTimeMillis(); + message.setLongProperty("sendTime", sendTime); + producer.send(message); + + LOG.debug("Receiving message " + i); + Message receivedMessage = consumer.receive(5000); + assertNotNull("On message " + i, receivedMessage); + assertTrue("On message " + i, receivedMessage instanceof StreamMessage); + + StreamMessage receivedStreamMessage = (StreamMessage) receivedMessage; + + int numElements = 0; + try { + while (true) { + receivedStreamMessage.readBoolean(); + numElements++; + } + } catch (Exception ex) { + } + + LOG.info("Iteration [{}]: Received Message contained {} boolean values.", i, numElements); + assertEquals(i, numElements); + + long receivedSendTime = receivedStreamMessage.getLongProperty("sendTime"); + assertEquals("On message " + i, receivedSendTime, sendTime); + + // Add a new bool value on each iteration. + message.writeBoolean(true); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java new file mode 100644 index 0000000..026a4be --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.io.IOException; +import java.util.Map; + +import javax.jms.JMSException; + +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.util.ByteSequence; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4893Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4893Test.class); + + @Test + public void testPropertiesInt() throws Exception { + ActiveMQObjectMessage message = new ActiveMQObjectMessage(); + message.setIntProperty("TestProp", 333); + fakeUnmarshal(message); + roundTripProperties(message); + } + + @Test + public void testPropertiesString() throws Exception { + ActiveMQObjectMessage message = new ActiveMQObjectMessage(); + message.setStringProperty("TestProp", "Value"); + fakeUnmarshal(message); + roundTripProperties(message); + } + + @Test + public void testPropertiesObject() throws Exception { + ActiveMQObjectMessage message = new ActiveMQObjectMessage(); + message.setObjectProperty("TestProp", "Value"); + fakeUnmarshal(message); + roundTripProperties(message); + } + + @Test + public void testPropertiesObjectNoMarshalling() throws Exception { + ActiveMQObjectMessage message = new ActiveMQObjectMessage(); + message.setObjectProperty("TestProp", "Value"); + roundTripProperties(message); + } + + private void roundTripProperties(ActiveMQObjectMessage message) throws IOException, JMSException { + ActiveMQObjectMessage copy = new ActiveMQObjectMessage(); + for (Map.Entry<String, Object> prop : message.getProperties().entrySet()) { + LOG.debug("{} -> {}", prop.getKey(), prop.getValue().getClass()); + copy.setObjectProperty(prop.getKey(), prop.getValue()); + } + } + + private void fakeUnmarshal(ActiveMQObjectMessage message) throws IOException { + // we need to force the unmarshalled property field to be set so it + // gives us a hawtbuffer for the string + OpenWireFormat format = new OpenWireFormat(); + message.beforeMarshall(format); + message.afterMarshall(format); + + ByteSequence seq = message.getMarshalledProperties(); + message.clearProperties(); + message.setMarshalledProperties(seq); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java new file mode 100644 index 0000000..81140ce --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.plugin.SubQueueSelectorCacheBrokerPlugin; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class AMQ4899Test { + protected static final Logger LOG = LoggerFactory.getLogger(AMQ4899Test.class); + private static final String QUEUE_NAME="AMQ4899TestQueue"; + private static final String CONSUMER_QUEUE="Consumer.Orders.VirtualOrders." + QUEUE_NAME; + private static final String PRODUCER_DESTINATION_NAME = "VirtualOrders." + QUEUE_NAME; + + private static final Integer MESSAGE_LIMIT = 20; + public static final String CONSUMER_A_SELECTOR = "Order < " + 10; + public static String CONSUMER_B_SELECTOR = "Order >= " + 10; + private CountDownLatch consumersStarted = new CountDownLatch(2); + private CountDownLatch consumerAtoConsumeCount= new CountDownLatch(10); + private CountDownLatch consumerBtoConsumeCount = new CountDownLatch(10); + + private BrokerService broker; + + @Before + public void setUp() { + setupBroker("broker://()/localhost?"); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test(timeout = 60 * 1000) + public void testVirtualTopicMultipleSelectors() throws Exception{ + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue consumerQueue = session.createQueue(CONSUMER_QUEUE); + + MessageListener listenerA = new AMQ4899Listener("A", consumersStarted, consumerAtoConsumeCount); + MessageConsumer consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR); + consumerA.setMessageListener(listenerA); + + MessageListener listenerB = new AMQ4899Listener("B", consumersStarted, consumerBtoConsumeCount); + MessageConsumer consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR); + consumerB.setMessageListener(listenerB); + + consumersStarted.await(10, TimeUnit.SECONDS); + assertEquals("Not all consumers started in time", 0, consumersStarted.getCount()); + + Destination producerDestination = session.createTopic(PRODUCER_DESTINATION_NAME); + MessageProducer producer = session.createProducer(producerDestination); + int messageIndex = 0; + for (int i=0; i < MESSAGE_LIMIT; i++) { + if (i==3) { + LOG.debug("Stopping consumerA"); + consumerA.close(); + } + + if (i == 14) { + LOG.debug("Stopping consumer B"); + consumerB.close(); + } + String messageText = "hello " + messageIndex++ + " sent at " + new java.util.Date().toString(); + TextMessage message = session.createTextMessage(messageText); + message.setIntProperty("Order", i); + LOG.debug("Sending message [{}]", messageText); + producer.send(message); + Thread.sleep(100); + } + Thread.sleep(1 * 1000); + + // restart consumerA + LOG.debug("Restarting consumerA"); + consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR); + consumerA.setMessageListener(listenerA); + + // restart consumerB + LOG.debug("restarting consumerB"); + consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR); + consumerB.setMessageListener(listenerB); + + consumerAtoConsumeCount.await(5, TimeUnit.SECONDS); + consumerBtoConsumeCount.await(5, TimeUnit.SECONDS); + + LOG.debug("Unconsumed messages for consumerA {} consumerB {}", consumerAtoConsumeCount.getCount(), consumerBtoConsumeCount.getCount()); + + assertEquals("Consumer A did not consume all messages", 0, consumerAtoConsumeCount.getCount()); + assertEquals("Consumer B did not consume all messages", 0, consumerBtoConsumeCount.getCount()); + + connection.close(); + } + + /** + * Setup broker with VirtualTopic configured + */ + private void setupBroker(String uri) { + try { + broker = BrokerFactory.createBroker(uri); + + VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); + VirtualTopic virtualTopic = new VirtualTopic(); + virtualTopic.setName("VirtualOrders.>"); + virtualTopic.setSelectorAware(true); + VirtualDestination[] virtualDestinations = { virtualTopic }; + interceptor.setVirtualDestinations(virtualDestinations); + broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor}); + + SubQueueSelectorCacheBrokerPlugin subQueueSelectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin(); + BrokerPlugin[] updatedPlugins = {subQueueSelectorCacheBrokerPlugin}; + broker.setPlugins(updatedPlugins); + + broker.start(); + broker.waitUntilStarted(); + } catch (Exception e) { + LOG.error("Failed creating broker", e); + } + } +} + +class AMQ4899Listener implements MessageListener { + Logger LOG = LoggerFactory.getLogger(AMQ4899Listener.class); + CountDownLatch toConsume; + String id; + + public AMQ4899Listener(String id, CountDownLatch started, CountDownLatch toConsume) { + this.id = id; + this.toConsume = toConsume; + started.countDown(); + } + + @Override + public void onMessage(Message message) { + toConsume.countDown(); + try { + if (message instanceof TextMessage) { + TextMessage textMessage = (TextMessage) message; + LOG.debug("Listener {} received [{}]", id, textMessage.getText()); + } else { + LOG.error("Listener {} Expected a TextMessage, got {}", id, message.getClass().getCanonicalName()); + } + } catch (JMSException e) { + LOG.error("Unexpected JMSException in Listener " + id, e); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java new file mode 100644 index 0000000..e65ad91 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4930Test extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(AMQ4930Test.class); + final int messageCount = 150; + final int messageSize = 1024*1024; + final int maxBrowsePageSize = 50; + final ActiveMQQueue bigQueue = new ActiveMQQueue("BIG"); + BrokerService broker; + ActiveMQConnectionFactory factory; + + protected void configureBroker() throws Exception { + broker.setDeleteAllMessagesOnStartup(true); + broker.setAdvisorySupport(false); + broker.getSystemUsage().getMemoryUsage().setLimit(1*1024*1024); + + PolicyMap pMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + // disable expriy processing as this will call browse in parallel + policy.setExpireMessagesPeriod(0); + policy.setMaxPageSize(maxBrowsePageSize); + policy.setMaxBrowsePageSize(maxBrowsePageSize); + pMap.setDefaultEntry(policy); + + broker.setDestinationPolicy(pMap); + } + + public void testBrowsePendingNonPersistent() throws Exception { + doTestBrowsePending(DeliveryMode.NON_PERSISTENT); + } + + public void testBrowsePendingPersistent() throws Exception { + doTestBrowsePending(DeliveryMode.PERSISTENT); + } + + public void testWithStatsDisabled() throws Exception { + ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().setEnabled(false); + doTestBrowsePending(DeliveryMode.PERSISTENT); + } + + public void doTestBrowsePending(int deliveryMode) throws Exception { + + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(bigQueue); + producer.setDeliveryMode(deliveryMode); + BytesMessage bytesMessage = session.createBytesMessage(); + bytesMessage.writeBytes(new byte[messageSize]); + + for (int i = 0; i < messageCount; i++) { + producer.send(bigQueue, bytesMessage); + } + + final QueueViewMBean queueViewMBean = (QueueViewMBean) + broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], QueueViewMBean.class, false); + + LOG.info(queueViewMBean.getName() + " Size: " + queueViewMBean.getEnqueueCount()); + + connection.close(); + + assertFalse("Cache disabled on q", queueViewMBean.isCacheEnabled()); + + // ensure repeated browse does now blow mem + + final Queue underTest = (Queue) ((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(bigQueue); + + // do twice to attempt to pull in 2*maxBrowsePageSize which uses up the system memory limit + Message[] browsed = underTest.browse(); + LOG.info("Browsed: " + browsed.length); + assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length); + browsed = underTest.browse(); + LOG.info("Browsed: " + browsed.length); + assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length); + Runtime.getRuntime().gc(); + long free = Runtime.getRuntime().freeMemory()/1024; + LOG.info("free at start of check: " + free); + // check for memory growth + for (int i=0; i<10; i++) { + LOG.info("free: " + Runtime.getRuntime().freeMemory()/1024); + browsed = underTest.browse(); + LOG.info("Browsed: " + browsed.length); + assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length); + Runtime.getRuntime().gc(); + Runtime.getRuntime().gc(); + assertTrue("No growth: " + Runtime.getRuntime().freeMemory()/1024 + " >= " + (free - (free * 0.2)), Runtime.getRuntime().freeMemory()/1024 >= (free - (free * 0.2))); + } + } + + + protected void setUp() throws Exception { + super.setUp(); + broker = new BrokerService(); + broker.setBrokerName("thisOne"); + configureBroker(); + broker.start(); + factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true"); + factory.setWatchTopicAdvisories(false); + + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + broker = null; + } + } + +} \ No newline at end of file
