http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java new file mode 100644 index 0000000..d624d36 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeStreamletTest.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Destination; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author rnewson + */ +public final class LargeStreamletTest extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(LargeStreamletTest.class); + private static final String BROKER_URL = "vm://localhost?broker.persistent=false"; + private static final int BUFFER_SIZE = 1 * 1024; + private static final int MESSAGE_COUNT = 10 * 1024; + + protected Exception writerException; + protected Exception readerException; + + private final AtomicInteger totalRead = new AtomicInteger(); + private final AtomicInteger totalWritten = new AtomicInteger(); + private final AtomicBoolean stopThreads = new AtomicBoolean(false); + + public void testStreamlets() throws Exception { + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL); + + final ActiveMQConnection connection = (ActiveMQConnection)factory.createConnection(); + connection.start(); + try { + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + try { + final Destination destination = session.createQueue("wibble"); + final Thread readerThread = new Thread(new Runnable() { + + @Override + public void run() { + totalRead.set(0); + try { + final InputStream inputStream = connection.createInputStream(destination); + try { + int read; + final byte[] buf = new byte[BUFFER_SIZE]; + while (!stopThreads.get() && (read = inputStream.read(buf)) != -1) { + totalRead.addAndGet(read); + } + } finally { + inputStream.close(); + } + } catch (Exception e) { + readerException = e; + e.printStackTrace(); + } finally { + LOG.info(totalRead + " total bytes read."); + } + } + }); + + final Thread writerThread = new Thread(new Runnable() { + private final Random random = new Random(); + + @Override + public void run() { + totalWritten.set(0); + int count = MESSAGE_COUNT; + try { + final OutputStream outputStream = connection.createOutputStream(destination); + try { + final byte[] buf = new byte[BUFFER_SIZE]; + random.nextBytes(buf); + while (count > 0 && !stopThreads.get()) { + outputStream.write(buf); + totalWritten.addAndGet(buf.length); + count--; + } + } finally { + outputStream.close(); + } + } catch (Exception e) { + writerException = e; + e.printStackTrace(); + } finally { + LOG.info(totalWritten + " total bytes written."); + } + } + }); + + readerThread.start(); + writerThread.start(); + + // Wait till reader is has finished receiving all the messages + // or he has stopped + // receiving messages. + Thread.sleep(1000); + int lastRead = totalRead.get(); + while (readerThread.isAlive()) { + readerThread.join(1000); + // No progress?? then stop waiting.. + if (lastRead == totalRead.get()) { + break; + } + lastRead = totalRead.get(); + } + + stopThreads.set(true); + + assertTrue("Should not have received a reader exception", readerException == null); + assertTrue("Should not have received a writer exception", writerException == null); + + assertEquals("Not all messages accounted for", totalWritten.get(), totalRead.get()); + + } finally { + session.close(); + } + } finally { + connection.close(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LoadTestBurnIn.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LoadTestBurnIn.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LoadTestBurnIn.java new file mode 100644 index 0000000..4462844 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LoadTestBurnIn.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +import junit.framework.Test; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQDestination; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Small burn test moves sends a moderate amount of messages through the broker, + * to checking to make sure that the broker does not lock up after a while of + * sustained messaging. + * + * + */ +public class LoadTestBurnIn extends JmsTestSupport { + private static final transient Logger LOG = LoggerFactory.getLogger(LoadTestBurnIn.class); + + public ActiveMQDestination destination; + public int deliveryMode; + public byte destinationType; + public boolean durableConsumer; + public int messageCount = 50000; + public int messageSize = 1024; + + public static Test suite() { + return suite(LoadTestBurnIn.class); + } + + protected void setUp() throws Exception { + LOG.info("Start: " + getName()); + super.setUp(); + } + + protected void tearDown() throws Exception { + try { + super.tearDown(); + } catch (Throwable e) { + e.printStackTrace(System.out); + } finally { + LOG.info("End: " + getName()); + } + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + protected BrokerService createBroker() throws Exception { + return BrokerFactory.createBroker(new URI("broker://(tcp://localhost:0)?useJmx=true")); + // return BrokerFactory.createBroker(new + // URI("xbean:org/apache/activemq/broker/store/loadtester.xml")); + } + + protected ConnectionFactory createConnectionFactory() throws URISyntaxException, IOException { + return new ActiveMQConnectionFactory(((TransportConnector)broker.getTransportConnectors().get(0)) + .getServer().getConnectURI()); + } + + public void initCombosForTestSendReceive() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), + Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); + addCombinationValues("durableConsumer", new Object[] {Boolean.TRUE}); + addCombinationValues("messageSize", new Object[] {Integer.valueOf(101), Integer.valueOf(102), + Integer.valueOf(103), Integer.valueOf(104), + Integer.valueOf(105), Integer.valueOf(106), + Integer.valueOf(107), Integer.valueOf(108)}); + } + + public void testSendReceive() throws Exception { + + // Durable consumer combination is only valid with topics + if (durableConsumer && destinationType != ActiveMQDestination.TOPIC_TYPE) { + return; + } + + connection.setClientID(getName()); + connection.getPrefetchPolicy().setAll(1000); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer; + if (durableConsumer) { + consumer = session.createDurableSubscriber((Topic)destination, "sub1:" + + System.currentTimeMillis()); + } else { + consumer = session.createConsumer(destination); + } + profilerPause("Ready: "); + + final CountDownLatch producerDoneLatch = new CountDownLatch(1); + + // Send the messages, async + new Thread() { + public void run() { + Connection connection2 = null; + try { + connection2 = factory.createConnection(); + Session session = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + for (int i = 0; i < messageCount; i++) { + BytesMessage m = session.createBytesMessage(); + m.writeBytes(new byte[messageSize]); + producer.send(m); + } + producer.close(); + } catch (JMSException e) { + e.printStackTrace(); + } finally { + safeClose(connection2); + producerDoneLatch.countDown(); + } + + } + }.start(); + + // Make sure all the messages were delivered. + Message message = null; + for (int i = 0; i < messageCount; i++) { + message = consumer.receive(5000); + assertNotNull("Did not get message: " + i, message); + } + + profilerPause("Done: "); + + assertNull(consumer.receiveNoWait()); + message.acknowledge(); + + // Make sure the producer thread finishes. + assertTrue(producerDoneLatch.await(5, TimeUnit.SECONDS)); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageEvictionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageEvictionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageEvictionTest.java new file mode 100644 index 0000000..b079070 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageEvictionTest.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; +import org.apache.activemq.broker.region.policy.FilePendingSubscriberMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; +import org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageEvictionTest { + static final Logger LOG = LoggerFactory.getLogger(MessageEvictionTest.class); + private BrokerService broker; + private ConnectionFactory connectionFactory; + Connection connection; + private Session session; + private Topic destination; + private final String destinationName = "verifyEvection"; + protected int numMessages = 2000; + protected String payload = new String(new byte[1024*2]); + + public void setUp(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) throws Exception { + broker = createBroker(pendingSubscriberPolicy); + broker.start(); + connectionFactory = createConnectionFactory(); + connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = session.createTopic(destinationName); + } + + @After + public void tearDown() throws Exception { + connection.stop(); + broker.stop(); + } + + @Test + public void testMessageEvictionMemoryUsageFileCursor() throws Exception { + setUp(new FilePendingSubscriberMessageStoragePolicy()); + doTestMessageEvictionMemoryUsage(); + } + + @Test + public void testMessageEvictionMemoryUsageVmCursor() throws Exception { + setUp(new VMPendingSubscriberMessageStoragePolicy()); + doTestMessageEvictionMemoryUsage(); + } + + @Test + public void testMessageEvictionDiscardedAdvisory() throws Exception { + setUp(new VMPendingSubscriberMessageStoragePolicy()); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + final CountDownLatch consumerRegistered = new CountDownLatch(1); + final CountDownLatch gotAdvisory = new CountDownLatch(1); + final CountDownLatch advisoryIsGood = new CountDownLatch(1); + + executor.execute(new Runnable() { + @Override + public void run() { + try { + ActiveMQTopic discardedAdvisoryDestination = + AdvisorySupport.getMessageDiscardedAdvisoryTopic(destination); + // use separate session rather than asyncDispatch on consumer session + // as we want consumer session to block + Session advisorySession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = advisorySession.createConsumer(discardedAdvisoryDestination); + consumer.setMessageListener(new MessageListener() { + int advisoriesReceived = 0; + @Override + public void onMessage(Message message) { + try { + LOG.info("advisory:" + message); + ActiveMQMessage activeMQMessage = (ActiveMQMessage) message; + assertNotNull(activeMQMessage.getStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID)); + assertEquals(++advisoriesReceived, activeMQMessage.getIntProperty(AdvisorySupport.MSG_PROPERTY_DISCARDED_COUNT)); + message.acknowledge(); + advisoryIsGood.countDown(); + } catch (JMSException e) { + e.printStackTrace(); + fail(e.toString()); + } finally { + gotAdvisory.countDown(); + } + } + }); + consumerRegistered.countDown(); + gotAdvisory.await(120, TimeUnit.SECONDS); + consumer.close(); + advisorySession.close(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.toString()); + } + } + }); + assertTrue("we have an advisory consumer", consumerRegistered.await(60, TimeUnit.SECONDS)); + doTestMessageEvictionMemoryUsage(); + assertTrue("got an advisory for discarded", gotAdvisory.await(0, TimeUnit.SECONDS)); + assertTrue("advisory is good",advisoryIsGood.await(0, TimeUnit.SECONDS)); + } + + public void doTestMessageEvictionMemoryUsage() throws Exception { + + ExecutorService executor = Executors.newCachedThreadPool(); + final CountDownLatch doAck = new CountDownLatch(1); + final CountDownLatch ackDone = new CountDownLatch(1); + final CountDownLatch consumerRegistered = new CountDownLatch(1); + executor.execute(new Runnable() { + @Override + public void run() { + try { + final MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + // very slow, only ack once + doAck.await(60, TimeUnit.SECONDS); + LOG.info("acking: " + message.getJMSMessageID()); + message.acknowledge(); + ackDone.countDown(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.toString()); + } finally { + consumerRegistered.countDown(); + ackDone.countDown(); + } + } + }); + consumerRegistered.countDown(); + ackDone.await(60, TimeUnit.SECONDS); + consumer.close(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.toString()); + } + } + }); + + assertTrue("we have a consumer", consumerRegistered.await(10, TimeUnit.SECONDS)); + + final AtomicInteger sent = new AtomicInteger(0); + final CountDownLatch sendDone = new CountDownLatch(1); + executor.execute(new Runnable() { + @Override + public void run() { + MessageProducer producer; + try { + producer = session.createProducer(destination); + for (int i=0; i< numMessages; i++) { + producer.send(session.createTextMessage(payload)); + sent.incrementAndGet(); + TimeUnit.MILLISECONDS.sleep(10); + } + producer.close(); + sendDone.countDown(); + } catch (Exception e) { + sendDone.countDown(); + e.printStackTrace(); + fail(e.toString()); + } + } + }); + + assertTrue("messages sending done", sendDone.await(180, TimeUnit.SECONDS)); + assertEquals("all message were sent", numMessages, sent.get()); + + doAck.countDown(); + executor.shutdown(); + executor.awaitTermination(30, TimeUnit.SECONDS); + + assertTrue("usage goes to 0 once consumer goes away", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == TestSupport.getDestination(broker, + ActiveMQDestination.transform(destination)).getMemoryUsage().getPercentUsage(); + } + })); + } + + BrokerService createBroker(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy) throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.addConnector("tcp://localhost:0"); + brokerService.setUseJmx(false); + brokerService.setDeleteAllMessagesOnStartup(true); + + // spooling to disk early so topic memory limit is not reached + brokerService.getSystemUsage().getMemoryUsage().setLimit(500*1024); + + final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>(); + final PolicyEntry entry = new PolicyEntry(); + entry.setTopic(">"); + + entry.setAdvisoryForDiscardingMessages(true); + + // so consumer does not get over run while blocked limit the prefetch + entry.setTopicPrefetch(50); + + + entry.setPendingSubscriberPolicy(pendingSubscriberPolicy); + + // limit the number of outstanding messages, large enough to use the file store + // or small enough not to blow memory limit + int pendingMessageLimit = 50; + if (pendingSubscriberPolicy instanceof FilePendingSubscriberMessageStoragePolicy) { + pendingMessageLimit = 500; + } + ConstantPendingMessageLimitStrategy pendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy(); + pendingMessageLimitStrategy.setLimit(pendingMessageLimit); + entry.setPendingMessageLimitStrategy(pendingMessageLimitStrategy); + + // to keep the limit in check and up to date rather than just the first few, evict some + OldestMessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy(); + // whether to check expiry before eviction, default limit 1000 is fine as no ttl set in this test + //messageEvictionStrategy.setEvictExpiredMessagesHighWatermark(1000); + entry.setMessageEvictionStrategy(messageEvictionStrategy); + + // let evicted messaged disappear + entry.setDeadLetterStrategy(null); + policyEntries.add(entry); + + final PolicyMap policyMap = new PolicyMap(); + policyMap.setPolicyEntries(policyEntries); + brokerService.setDestinationPolicy(policyMap); + + return brokerService; + } + + ConnectionFactory createConnectionFactory() throws Exception { + String url = broker.getTransportConnectors().get(0).getServer().getConnectURI().toString(); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url); + factory.setWatchTopicAdvisories(false); + return factory; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java new file mode 100644 index 0000000..0fb9728 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageListenerRedeliveryTest extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(MessageListenerRedeliveryTest.class); + + private Connection connection; + + @Override + protected void setUp() throws Exception { + connection = createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + @Override + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + } + + protected RedeliveryPolicy getRedeliveryPolicy() { + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + redeliveryPolicy.setInitialRedeliveryDelay(0); + redeliveryPolicy.setRedeliveryDelay(1000); + redeliveryPolicy.setMaximumRedeliveries(3); + redeliveryPolicy.setBackOffMultiplier((short)2); + redeliveryPolicy.setUseExponentialBackOff(true); + return redeliveryPolicy; + } + + protected Connection createConnection() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&marshal=true"); + factory.setRedeliveryPolicy(getRedeliveryPolicy()); + return factory.createConnection(); + } + + private class TestMessageListener implements MessageListener { + + public int counter; + private final Session session; + + public TestMessageListener(Session session) { + this.session = session; + } + + @Override + public void onMessage(Message message) { + try { + LOG.info("Message Received: " + message); + counter++; + if (counter <= 4) { + LOG.info("Message Rollback."); + session.rollback(); + } else { + LOG.info("Message Commit."); + message.acknowledge(); + session.commit(); + } + } catch (JMSException e) { + LOG.error("Error when rolling back transaction"); + } + } + } + + public void testQueueRollbackConsumerListener() throws JMSException { + connection.start(); + + Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + MessageProducer producer = createProducer(session, queue); + Message message = createTextMessage(session); + producer.send(message); + session.commit(); + + MessageConsumer consumer = session.createConsumer(queue); + + ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer)consumer; + mc.setRedeliveryPolicy(getRedeliveryPolicy()); + + TestMessageListener listener = new TestMessageListener(session); + consumer.setMessageListener(listener); + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + + // first try.. should get 2 since there is no delay on the + // first redeliver.. + assertEquals(2, listener.counter); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + // 2nd redeliver (redelivery after 1 sec) + assertEquals(3, listener.counter); + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + } + // 3rd redeliver (redelivery after 2 seconds) - it should give up after + // that + assertEquals(4, listener.counter); + + // create new message + producer.send(createTextMessage(session)); + session.commit(); + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + // it should be committed, so no redelivery + assertEquals(5, listener.counter); + + try { + Thread.sleep(1500); + } catch (InterruptedException e) { + } + // no redelivery, counter should still be 4 + assertEquals(5, listener.counter); + + session.close(); + } + + public void testQueueRollbackSessionListener() throws JMSException { + connection.start(); + + Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + MessageProducer producer = createProducer(session, queue); + Message message = createTextMessage(session); + producer.send(message); + session.commit(); + + MessageConsumer consumer = session.createConsumer(queue); + + ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer)consumer; + mc.setRedeliveryPolicy(getRedeliveryPolicy()); + + TestMessageListener listener = new TestMessageListener(session); + consumer.setMessageListener(listener); + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + + } + // first try + assertEquals(2, listener.counter); + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + + } + // second try (redelivery after 1 sec) + assertEquals(3, listener.counter); + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + + } + // third try (redelivery after 2 seconds) - it should give up after that + assertEquals(4, listener.counter); + + // create new message + producer.send(createTextMessage(session)); + session.commit(); + + try { + Thread.sleep(500); + } catch (InterruptedException e) { + // ignore + } + // it should be committed, so no redelivery + assertEquals(5, listener.counter); + + try { + Thread.sleep(1500); + } catch (InterruptedException e) { + // ignore + } + // no redelivery, counter should still be 4 + assertEquals(5, listener.counter); + + session.close(); + } + + public void testQueueSessionListenerExceptionRetry() throws Exception { + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + MessageProducer producer = createProducer(session, queue); + Message message = createTextMessage(session, "1"); + producer.send(message); + message = createTextMessage(session, "2"); + producer.send(message); + + MessageConsumer consumer = session.createConsumer(queue); + + final CountDownLatch gotMessage = new CountDownLatch(2); + final AtomicInteger count = new AtomicInteger(0); + final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries(); + final ArrayList<String> received = new ArrayList<String>(); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + LOG.info("Message Received: " + message); + try { + received.add(((TextMessage) message).getText()); + } catch (JMSException e) { + e.printStackTrace(); + fail(e.toString()); + } + if (count.incrementAndGet() < maxDeliveries) { + throw new RuntimeException(getName() + " force a redelivery"); + } + // new blood + count.set(0); + gotMessage.countDown(); + } + }); + + assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS)); + + for (int i=0; i<maxDeliveries; i++) { + assertEquals("got first redelivered: " + i, "1", received.get(i)); + } + for (int i=maxDeliveries; i<maxDeliveries*2; i++) { + assertEquals("got first redelivered: " + i, "2", received.get(i)); + } + session.close(); + } + + public void testQueueSessionListenerExceptionDlq() throws Exception { + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + MessageProducer producer = createProducer(session, queue); + Message message = createTextMessage(session); + producer.send(message); + + final Message[] dlqMessage = new Message[1]; + ActiveMQDestination dlqDestination = new ActiveMQQueue("ActiveMQ.DLQ"); + MessageConsumer dlqConsumer = session.createConsumer(dlqDestination); + final CountDownLatch gotDlqMessage = new CountDownLatch(1); + dlqConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + LOG.info("DLQ Message Received: " + message); + dlqMessage[0] = message; + gotDlqMessage.countDown(); + } + }); + + MessageConsumer consumer = session.createConsumer(queue); + + final int maxDeliveries = getRedeliveryPolicy().getMaximumRedeliveries(); + final CountDownLatch gotMessage = new CountDownLatch(maxDeliveries); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + LOG.info("Message Received: " + message); + gotMessage.countDown(); + throw new RuntimeException(getName() + " force a redelivery"); + } + }); + + assertTrue("got message before retry expiry", gotMessage.await(20, TimeUnit.SECONDS)); + + // check DLQ + assertTrue("got dlq message", gotDlqMessage.await(20, TimeUnit.SECONDS)); + + // check DLQ message cause is captured + message = dlqMessage[0]; + assertNotNull("dlq message captured", message); + String cause = message.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + + LOG.info("DLQ'd message cause reported as: {}", cause); + + assertTrue("cause 'cause' exception is remembered", cause.contains("RuntimeException")); + assertTrue("is correct exception", cause.contains(getName())); + assertTrue("cause exception is remembered", cause.contains("Throwable")); + assertTrue("cause policy is remembered", cause.contains("RedeliveryPolicy")); + + session.close(); + } + + private TextMessage createTextMessage(Session session, String text) throws JMSException { + return session.createTextMessage(text); + } + private TextMessage createTextMessage(Session session) throws JMSException { + return session.createTextMessage("Hello"); + } + + private MessageProducer createProducer(Session session, Destination queue) throws JMSException { + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(getDeliveryMode()); + return producer; + } + + protected int getDeliveryMode() { + return DeliveryMode.PERSISTENT; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageTransformationTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageTransformationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageTransformationTest.java new file mode 100644 index 0000000..b4efd32 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/MessageTransformationTest.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq; + +import javax.jms.BytesMessage; +import javax.jms.MapMessage; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import junit.framework.TestCase; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMapMessage; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQObjectMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQStreamMessage; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTempTopic; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ActiveMQTopic; + +public class MessageTransformationTest extends TestCase { + + /** + * Sets up the resources of the unit test. + * + * @throws Exception + */ + protected void setUp() throws Exception { + } + + /** + * Clears up the resources used in the unit test. + */ + protected void tearDown() throws Exception { + } + + /** + * Tests transforming destinations into ActiveMQ's destination + * implementation. + */ + public void testTransformDestination() throws Exception { + assertTrue("Transforming a TempQueue destination to an ActiveMQTempQueue", + ActiveMQMessageTransformation.transformDestination((TemporaryQueue)new ActiveMQTempQueue()) instanceof ActiveMQTempQueue); + + assertTrue("Transforming a TempTopic destination to an ActiveMQTempTopic", + ActiveMQMessageTransformation.transformDestination((TemporaryTopic)new ActiveMQTempTopic()) instanceof ActiveMQTempTopic); + + assertTrue("Transforming a Queue destination to an ActiveMQQueue", ActiveMQMessageTransformation.transformDestination((Queue)new ActiveMQQueue()) instanceof ActiveMQQueue); + + assertTrue("Transforming a Topic destination to an ActiveMQTopic", ActiveMQMessageTransformation.transformDestination((Topic)new ActiveMQTopic()) instanceof ActiveMQTopic); + + assertTrue("Transforming a Destination to an ActiveMQDestination", + ActiveMQMessageTransformation.transformDestination((ActiveMQDestination)new ActiveMQTopic()) instanceof ActiveMQDestination); + } + + /** + * Tests transforming messages into ActiveMQ's message implementation. + */ + public void testTransformMessage() throws Exception { + assertTrue("Transforming a BytesMessage message into an ActiveMQBytesMessage", ActiveMQMessageTransformation.transformMessage((BytesMessage)new ActiveMQBytesMessage(), + null) instanceof ActiveMQBytesMessage); + + assertTrue("Transforming a MapMessage message to an ActiveMQMapMessage", + ActiveMQMessageTransformation.transformMessage((MapMessage)new ActiveMQMapMessage(), null) instanceof ActiveMQMapMessage); + + assertTrue("Transforming an ObjectMessage message to an ActiveMQObjectMessage", ActiveMQMessageTransformation.transformMessage((ObjectMessage)new ActiveMQObjectMessage(), + null) instanceof ActiveMQObjectMessage); + + assertTrue("Transforming a StreamMessage message to an ActiveMQStreamMessage", ActiveMQMessageTransformation.transformMessage((StreamMessage)new ActiveMQStreamMessage(), + null) instanceof ActiveMQStreamMessage); + + assertTrue("Transforming a TextMessage message to an ActiveMQTextMessage", + ActiveMQMessageTransformation.transformMessage((TextMessage)new ActiveMQTextMessage(), null) instanceof ActiveMQTextMessage); + + assertTrue("Transforming an ActiveMQMessage message to an ActiveMQMessage", + ActiveMQMessageTransformation.transformMessage(new ActiveMQMessage(), null) instanceof ActiveMQMessage); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java new file mode 100644 index 0000000..0851198 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.ServerSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// see: https://issues.apache.org/activemq/browse/AMQ-2651 +@RunWith(BlockJUnit4ClassRunner.class) +public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(OnePrefetchAsyncConsumerTest.class); + + private Connection connection; + private ConnectionConsumer connectionConsumer; + private Queue queue; + private final AtomicBoolean completed = new AtomicBoolean(); + private final AtomicBoolean success = new AtomicBoolean(); + + @Ignore("https://issues.apache.org/jira/browse/AMQ-5126") + @Test(timeout = 60 * 1000) + public void testPrefetchExtension() throws Exception { + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + + // when Msg1 is acked, the PrefetchSubscription will (incorrectly?) increment its prefetchExtension + producer.send(session.createTextMessage("Msg1")); + + // Msg2 will exhaust the ServerSessionPool (since it only has 1 ServerSession) + producer.send(session.createTextMessage("Msg2")); + + // Msg3 will cause the test to fail as it will attempt to retrieve an additional ServerSession from + // an exhausted ServerSessionPool due to the (incorrectly?) incremented prefetchExtension in the + // PrefetchSubscription + producer.send(session.createTextMessage("Msg3")); + + session.commit(); + + assertTrue("test completed on time", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return completed.get(); + } + })); + + assertTrue("Attempted to retrieve more than one ServerSession at a time", success.get()); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + + @Before + public void setUp() throws Exception { + setAutoFail(true); + bindAddress = "tcp://localhost:0"; + super.setUp(); + + connection = createConnection(); + queue = createQueue(); + // note the last arg of 1, this becomes the prefetchSize in PrefetchSubscription + connectionConsumer = connection.createConnectionConsumer(queue, null, new TestServerSessionPool(connection), 1); + connection.start(); + } + + @After + public void tearDown() throws Exception { + connectionConsumer.close(); + connection.close(); + super.tearDown(); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = super.createBroker(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + // ensure prefetch is exact. only delivery next when current is acked + defaultEntry.setUsePrefetchExtension(false); + policyMap.setDefaultEntry(defaultEntry); + answer.setDestinationPolicy(policyMap); + return answer; + } + + protected Queue createQueue() { + return new ActiveMQQueue(getDestinationString()); + } + + // simulates a ServerSessionPool with only 1 ServerSession + private class TestServerSessionPool implements ServerSessionPool { + Connection connection; + TestServerSession serverSession; + boolean serverSessionInUse = false; + + public TestServerSessionPool(Connection connection) throws JMSException { + this.connection = connection; + this.serverSession = new TestServerSession(this); + } + + @Override + public ServerSession getServerSession() throws JMSException { + synchronized (this) { + if (serverSessionInUse) { + LOG.info("asked for session while in use, not serialised delivery"); + success.set(false); + completed.set(true); + } + serverSessionInUse = true; + return serverSession; + } + } + } + + private class TestServerSession implements ServerSession { + TestServerSessionPool pool; + Session session; + + public TestServerSession(TestServerSessionPool pool) throws JMSException { + this.pool = pool; + session = pool.connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + session.setMessageListener(new TestMessageListener()); + } + + @Override + public Session getSession() throws JMSException { + return session; + } + + @Override + public void start() throws JMSException { + // use a separate thread to process the message asynchronously + new Thread() { + @Override + public void run() { + // let the session deliver the message + session.run(); + + // commit the tx and return ServerSession to pool + LOG.debug("Waiting on pool"); + synchronized (pool) { + try { + LOG.debug("About to call session.commit"); + session.commit(); + LOG.debug("Commit completed"); + } catch (JMSException e) { + LOG.error("In start", e); + } + pool.serverSessionInUse = false; + } + } + }.start(); + } + } + + private class TestMessageListener implements MessageListener { + @Override + public void onMessage(Message message) { + try { + String text = ((TextMessage) message).getText(); + LOG.info("got message: " + text); + if (text.equals("Msg3")) { + // if we get here, Exception in getServerSession() was not thrown, test is + // successful this obviously doesn't happen now, need to fix prefetchExtension + // computation logic in PrefetchSubscription to get here + success.set(true); + completed.set(true); + } else if (text.equals("Msg2")) { + // simulate long message processing so that Msg3 comes when Msg2 is still being + // processed and thus the single ServerSession is in use + TimeUnit.SECONDS.sleep(4); + } + } catch (JMSException e) { + LOG.error("in onMessage", e); + } catch (InterruptedException e) { + LOG.error("in onMessage",e); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java new file mode 100644 index 0000000..3899361 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.concurrent.TimeUnit; + +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OptimizedAckTest extends TestSupport { + private static final Logger LOG = LoggerFactory.getLogger(OptimizedAckTest.class); + private ActiveMQConnection connection; + + @Override + protected void setUp() throws Exception { + super.setUp(); + connection = (ActiveMQConnection) createConnection(); + connection.setOptimizeAcknowledge(true); + connection.setOptimizeAcknowledgeTimeOut(0); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setAll(10); + connection.setPrefetchPolicy(prefetchPolicy); + } + + @Override + protected void tearDown() throws Exception { + connection.close(); + super.tearDown(); + } + + public void testReceivedMessageStillInflight() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test"); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("Hello" + i)); + } + + final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); + MessageConsumer consumer = session.createConsumer(queue); + + assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); + return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + + for (int i = 0; i < 6; i++) { + javax.jms.Message msg = consumer.receive(4000); + assertNotNull(msg); + assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount()); + } + + for (int i = 6; i < 10; i++) { + javax.jms.Message msg = consumer.receive(4000); + assertNotNull(msg); + + assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + } + } + + public void testVerySlowReceivedMessageStillInflight() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test"); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("Hello" + i)); + } + + final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); + MessageConsumer consumer = session.createConsumer(queue); + + assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); + return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + + for (int i = 0; i < 6; i++) { + Thread.sleep(400); + javax.jms.Message msg = consumer.receive(4000); + assertNotNull(msg); + assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount()); + } + + for (int i = 6; i < 10; i++) { + Thread.sleep(400); + javax.jms.Message msg = consumer.receive(4000); + assertNotNull(msg); + + assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + } + + } + + public void testReceivedMessageNotInFlightAfterScheduledAckFires() throws Exception { + connection.setOptimizedAckScheduledAckInterval(TimeUnit.SECONDS.toMillis(10)); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("test"); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("Hello" + i)); + } + + final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker(); + MessageConsumer consumer = session.createConsumer(queue); + + assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); + return 10 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + + for (int i = 0; i < 6; i++) { + javax.jms.Message msg = consumer.receive(4000); + assertNotNull(msg); + assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount()); + } + + for (int i = 6; i < 10; i++) { + javax.jms.Message msg = consumer.receive(4000); + assertNotNull(msg); + assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 3 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + } + + assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount()); + return 0 == regionBroker.getDestinationStatistics().getInflight().getCount(); + } + })); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java new file mode 100644 index 0000000..83d9179 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/PerDestinationStoreLimitTest.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// see: https://issues.apache.org/activemq/browse/AMQ-2668 +public class PerDestinationStoreLimitTest extends JmsTestSupport { + static final Logger LOG = LoggerFactory.getLogger(PerDestinationStoreLimitTest.class); + final String oneKb = new String(new byte[1024]); + + ActiveMQDestination queueDest = new ActiveMQQueue("PerDestinationStoreLimitTest.Queue"); + ActiveMQDestination topicDest = new ActiveMQTopic("PerDestinationStoreLimitTest.Topic"); + + protected TransportConnector connector; + protected ActiveMQConnection connection; + + public void testDLQAfterBlockTopic() throws Exception { + doTestDLQAfterBlock(topicDest); + } + + public void testDLQAfterBlockQueue() throws Exception { + doTestDLQAfterBlock(queueDest); + } + + public void doTestDLQAfterBlock(ActiveMQDestination destination) throws Exception { + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + // Immediately sent to the DLQ on rollback, no redelivery + redeliveryPolicy.setMaximumRedeliveries(0); + factory.setRedeliveryPolicy(redeliveryPolicy); + + // Separate connection for consumer so it will not be blocked by filler thread + // sending when it blocks + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.setClientID("someId"); + connection.start(); + + final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = destination.isQueue() ? + consumerSession.createConsumer(destination) : + consumerSession.createDurableSubscriber((Topic) destination, "Durable"); + + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + final MessageProducer producer = session.createProducer(destination); + + final AtomicBoolean done = new AtomicBoolean(true); + final AtomicBoolean keepGoing = new AtomicBoolean(true); + final CountDownLatch fillerStarted = new CountDownLatch(1); + + final AtomicLong sent = new AtomicLong(0); + Thread thread = new Thread("Filler") { + int i; + @Override + public void run() { + while (keepGoing.get()) { + done.set(false); + fillerStarted.countDown(); + try { + producer.send(session.createTextMessage(oneKb + ++i)); + if (i%10 == 0) { + session.commit(); + sent.getAndAdd(10); + LOG.info("committed/sent: " + sent.get()); + } + LOG.info("sent: " + i); + } catch (JMSException e) { + } + } + } + }; + thread.start(); + + assertTrue("filler started..", fillerStarted.await(20, TimeUnit.SECONDS)); + waitForBlocked(done); + + // consume and rollback some so message gets to DLQ + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + TextMessage msg; + int received = 0; + for (;received < sent.get(); ++received) { + msg = (TextMessage) consumer.receive(4000); + if (msg == null) { + LOG.info("received null on count: " + received); + break; + } + LOG.info("received: " + received + ", msg: " + msg.getJMSMessageID()); + if (received%5==0) { + if (received%3==0) { + // force the use of the DLQ which will use some more store + LOG.info("rollback on : " + received); + consumerSession.rollback(); + } else { + LOG.info("commit on : " + received); + consumerSession.commit(); + } + } + } + LOG.info("Done:: sent: " + sent.get() + ", received: " + received); + keepGoing.set(false); + assertTrue("some were sent:", sent.get() > 0); + assertEquals("received what was committed", sent.get(), received); + } + + protected void waitForBlocked(final AtomicBoolean done) + throws InterruptedException { + while (true) { + Thread.sleep(1000); + // the producer is blocked once the done flag stays true + if (done.get()) { + LOG.info("Blocked...."); + break; + } + done.set(true); + } + } + + protected BrokerService createBroker() throws Exception { + BrokerService service = new BrokerService(); + service.setDeleteAllMessagesOnStartup(true); + + service.setUseJmx(false); + + service.getSystemUsage().getStoreUsage().setLimit(200*1024); + + // allow destination to use 50% of store, leaving 50% for DLQ. + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setStoreUsageHighWaterMark(50); + policyMap.put(queueDest, policy); + policyMap.put(topicDest, policy); + service.setDestinationPolicy(policyMap); + + connector = service.addConnector("tcp://localhost:0"); + return service; + } + + public void setUp() throws Exception { + setAutoFail(true); + super.setUp(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class); + t.getTransportListener().onException(new IOException("Disposed.")); + connection.getTransport().stop(); + super.tearDown(); + } + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(connector.getConnectUri()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java new file mode 100644 index 0000000..c18eccd --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlSendFailTest.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.ConnectionFactory; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.ResourceAllocationException; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; + +public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest { + + protected BrokerService createBroker() throws Exception { + BrokerService service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(false); + + // Setup a destination policy where it takes only 1 message at a time. + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setMemoryLimit(1); + policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy()); + policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); + policy.setProducerFlowControl(true); + policyMap.setDefaultEntry(policy); + service.setDestinationPolicy(policyMap); + + service.getSystemUsage().setSendFailIfNoSpace(true); + + connector = service.addConnector("tcp://localhost:0"); + return service; + } + + @Override + public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception { + // with sendFailIfNoSpace set, there is no blocking of the connection + } + + @Override + public void testAsyncPubisherRecoverAfterBlock() throws Exception { + // sendFail means no flowControllwindow as there is no producer ack, just an exception + } + + @Override + public void testPubisherRecoverAfterBlock() throws Exception { + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); + // with sendFail, there must be no flowControllwindow + // sendFail is an alternative flow control mechanism that does not block + factory.setUseAsyncSend(true); + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(queueA); + + final AtomicBoolean keepGoing = new AtomicBoolean(true); + + Thread thread = new Thread("Filler") { + @Override + public void run() { + while (keepGoing.get()) { + try { + producer.send(session.createTextMessage("Test message")); + if (gotResourceException.get()) { + // do not flood the broker with requests when full as we are sending async and they + // will be limited by the network buffers + Thread.sleep(200); + } + } catch (Exception e) { + // with async send, there will be no exceptions + e.printStackTrace(); + } + } + } + }; + thread.start(); + waitForBlockedOrResourceLimit(new AtomicBoolean(false)); + + // resourceException on second message, resumption if we + // can receive 10 + MessageConsumer consumer = session.createConsumer(queueA); + TextMessage msg; + for (int idx = 0; idx < 10; ++idx) { + msg = (TextMessage) consumer.receive(1000); + if (msg != null) { + msg.acknowledge(); + } + } + keepGoing.set(false); + } + + public void testPubisherRecoverAfterBlockWithSyncSend() throws Exception { + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); + factory.setExceptionListener(null); + factory.setUseAsyncSend(false); + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(queueA); + + final AtomicBoolean keepGoing = new AtomicBoolean(true); + final AtomicInteger exceptionCount = new AtomicInteger(0); + Thread thread = new Thread("Filler") { + @Override + public void run() { + while (keepGoing.get()) { + try { + producer.send(session.createTextMessage("Test message")); + } catch (JMSException arg0) { + if (arg0 instanceof ResourceAllocationException) { + gotResourceException.set(true); + exceptionCount.incrementAndGet(); + } + } + } + } + }; + thread.start(); + waitForBlockedOrResourceLimit(new AtomicBoolean(false)); + + // resourceException on second message, resumption if we + // can receive 10 + MessageConsumer consumer = session.createConsumer(queueA); + TextMessage msg; + for (int idx = 0; idx < 10; ++idx) { + msg = (TextMessage) consumer.receive(1000); + if (msg != null) { + msg.acknowledge(); + } + } + assertTrue("we were blocked at least 5 times", 5 < exceptionCount.get()); + keepGoing.set(false); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connector.getConnectUri()); + connectionFactory.setExceptionListener(new ExceptionListener() { + public void onException(JMSException arg0) { + if (arg0 instanceof ResourceAllocationException) { + gotResourceException.set(true); + } + } + }); + return connectionFactory; + } +}
