http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java new file mode 100644 index 0000000..25a95bc --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2413Test.java @@ -0,0 +1,346 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Vector; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.Test; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.CombinationTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ2413Test extends CombinationTestSupport implements MessageListener { + private static final Logger LOG = LoggerFactory.getLogger(AMQ2413Test.class); + BrokerService broker; + private ActiveMQConnectionFactory factory; + + private static final int HANG_THRESHOLD = 60; + private static final int SEND_COUNT = 1000; + private static final int RECEIVER_THINK_TIME = 1; + private static final int CONSUMER_COUNT = 1; + private static final int PRODUCER_COUNT = 50; + private static final int TO_SEND = SEND_COUNT / PRODUCER_COUNT; + + public int deliveryMode = DeliveryMode.NON_PERSISTENT; + public int ackMode = Session.DUPS_OK_ACKNOWLEDGE; + public boolean useVMCursor = false; + public boolean useOptimizeAcks = false; + + private final ArrayList<Service> services = new ArrayList<Service>(CONSUMER_COUNT + PRODUCER_COUNT); + AtomicInteger count = new AtomicInteger(0); + Semaphore receivedMessages; + AtomicBoolean running = new AtomicBoolean(false); + + public void initCombos() { + addCombinationValues("deliveryMode", new Object[] { DeliveryMode.PERSISTENT, DeliveryMode.NON_PERSISTENT }); + addCombinationValues("ackMode", new Object[] { Session.DUPS_OK_ACKNOWLEDGE, Session.AUTO_ACKNOWLEDGE }); + addCombinationValues("useVMCursor", new Object[] { true, false }); + // addCombinationValues("useOptimizeAcks", new Object[] {true, false}); + } + + @Override + protected void setUp() throws Exception { + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "test-data" + File.separator + "AMQ2401Test"); + broker.setDeleteAllMessagesOnStartup(true); + + KahaDBPersistenceAdapter kahaDb = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + kahaDb.setConcurrentStoreAndDispatchQueues(false); + broker.addConnector("tcp://0.0.0.0:2401"); + PolicyMap policies = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setMemoryLimit(1024 * 1024); + entry.setProducerFlowControl(true); + if (useVMCursor) { + entry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); + } + entry.setQueue(">"); + policies.setDefaultEntry(entry); + broker.setDestinationPolicy(policies); + broker.start(); + broker.waitUntilStarted(); + + count.set(0); + receivedMessages = new Semaphore(0); + + factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:2401"); + // factory = new ActiveMQConnectionFactory("vm://localhost?broker.useJmx=false&broker.persistent=false"); + setAutoFail(true); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + running.set(false); + for (Service service : services) { + service.close(); + } + + broker.stop(); + broker.waitUntilStopped(); + + super.tearDown(); + } + + public void testReceipt() throws Exception { + + running.set(true); + TestProducer p = null; + TestConsumer c = null; + try { + + for (int i = 0; i < CONSUMER_COUNT; i++) { + TestConsumer consumer = new TestConsumer(); + consumer.start(); + services.add(consumer); + } + for (int i = 0; i < PRODUCER_COUNT; i++) { + TestProducer producer = new TestProducer(i); + producer.start(); + services.add(producer); + } + waitForMessageReceipt(); + + } finally { + if (p != null) { + p.close(); + } + + if (c != null) { + c.close(); + } + } + + } + + /* + * (non-Javadoc) + * + * @see javax.jms.MessageListener#onMessage(javax.jms.Message) + */ + @Override + public void onMessage(Message message) { + receivedMessages.release(); + if (count.incrementAndGet() % 100 == 0) { + LOG.info("Received message " + count); + } + track(message); + if (RECEIVER_THINK_TIME > 0) { + try { + Thread.sleep(RECEIVER_THINK_TIME); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + } + + HashMap<ProducerId, boolean[]> tracker = new HashMap<ProducerId, boolean[]>(); + + private synchronized void track(Message message) { + try { + MessageId id = new MessageId(message.getJMSMessageID()); + ProducerId pid = id.getProducerId(); + int seq = (int) id.getProducerSequenceId(); + boolean[] ids = tracker.get(pid); + if (ids == null) { + ids = new boolean[TO_SEND + 1]; + ids[seq] = true; + tracker.put(pid, ids); + } else { + assertTrue("not already received: " + id, !ids[seq]); + ids[seq] = true; + } + } catch (Exception e) { + LOG.error(e.toString()); + } + } + + /** + * @throws InterruptedException + * @throws TimeoutException + * + */ + private void waitForMessageReceipt() throws InterruptedException, TimeoutException { + try { + while (count.get() < SEND_COUNT) { + if (!receivedMessages.tryAcquire(HANG_THRESHOLD, TimeUnit.SECONDS)) { + if (count.get() == SEND_COUNT) + break; + verifyTracking(); + throw new TimeoutException("@count=" + count.get() + " Message not received for more than " + HANG_THRESHOLD + " seconds"); + } + } + } finally { + running.set(false); + } + } + + private void verifyTracking() { + Vector<MessageId> missing = new Vector<MessageId>(); + for (ProducerId pid : tracker.keySet()) { + boolean[] ids = tracker.get(pid); + for (int i = 1; i < TO_SEND + 1; i++) { + if (!ids[i]) { + missing.add(new MessageId(pid, i)); + } + } + } + assertTrue("No missing messages: " + missing, missing.isEmpty()); + } + + private interface Service { + public void start() throws Exception; + + public void close(); + } + + private class TestProducer implements Runnable, Service { + Thread thread; + BytesMessage message; + Connection connection; + Session session; + MessageProducer producer; + + TestProducer(int id) throws Exception { + thread = new Thread(this, "TestProducer-" + id); + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + producer = session.createProducer(session.createQueue("AMQ2401Test")); + } + + @Override + public void start() { + thread.start(); + } + + @Override + public void run() { + + int i = 1; + for (; i <= TO_SEND; i++) { + try { + + if (+i % 100 == 0) { + LOG.info(Thread.currentThread().getName() + " Sending message " + i); + } + message = session.createBytesMessage(); + message.writeBytes(new byte[1024]); + producer.setDeliveryMode(deliveryMode); + producer.send(message); + } catch (JMSException jmse) { + jmse.printStackTrace(); + break; + } + } + LOG.info(Thread.currentThread().getName() + " Sent: " + (i - 1)); + } + + @Override + public void close() { + try { + connection.close(); + } catch (JMSException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + + private class TestConsumer implements Runnable, Service { + ActiveMQConnection connection; + Session session; + MessageConsumer consumer; + + TestConsumer() throws Exception { + factory.setOptimizeAcknowledge(false); + connection = (ActiveMQConnection) factory.createConnection(); + if (useOptimizeAcks) { + connection.setOptimizeAcknowledge(true); + } + + session = connection.createSession(false, ackMode); + consumer = session.createConsumer(session.createQueue("AMQ2401Test")); + + consumer.setMessageListener(AMQ2413Test.this); + } + + @Override + public void start() throws Exception { + connection.start(); + } + + @Override + public void close() { + try { + connection.close(); + } catch (JMSException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + while (running.get()) { + try { + onMessage(consumer.receive()); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } + + public static Test suite() { + return suite(AMQ2413Test.class); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java new file mode 100644 index 0000000..cd447f4 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2439Test.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.net.URI; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ2439Test extends JmsMultipleBrokersTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(AMQ2439Test.class); + Destination dest; + + + public void testDuplicatesThroughNetwork() throws Exception { + assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500)); + assertEquals("received expected amount", 500, receiveExactMessages("BrokerB", 500)); + validateQueueStats(); + } + + private void validateQueueStats() throws Exception { + final BrokerView brokerView = brokers.get("BrokerA").broker.getAdminView(); + assertEquals("enequeue is correct", 1000, brokerView.getTotalEnqueueCount()); + + assertTrue("dequeue is correct", Wait.waitFor(new Wait.Condition() { + public boolean isSatisified() throws Exception { + LOG.info("dequeue count (want 1000), is : " + brokerView.getTotalDequeueCount()); + return 1000 == brokerView.getTotalDequeueCount(); + } + })); + } + + protected int receiveExactMessages(String brokerName, int msgCount) throws Exception { + + BrokerItem brokerItem = brokers.get(brokerName); + Connection connection = brokerItem.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(dest); + + Message msg; + int i; + for (i = 0; i < msgCount; i++) { + msg = consumer.receive(1000); + if (msg == null) { + break; + } + } + + connection.close(); + brokerItem.connections.remove(connection); + + return i; + } + + public void setUp() throws Exception { + super.setUp(); + createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA?persistent=true&deleteAllMessagesOnStartup=true&advisorySupport=false")); + createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB?persistent=true&deleteAllMessagesOnStartup=true&useJmx=false")); + bridgeBrokers("BrokerA", "BrokerB"); + + startAllBrokers(); + + // Create queue + dest = createDestination("TEST.FOO", false); + sendMessages("BrokerA", dest, 1000); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java new file mode 100644 index 0000000..b581e6d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2489Test.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.TestSupport; +import org.apache.activemq.command.ActiveMQQueue; + +/** + * In CLIENT_ACKNOWLEDGE and INDIVIDUAL_ACKNOWLEDGE modes following exception + * occurs when ASYNCH consumers acknowledges messages in not in order they + * received the messages. + * <p> + * Exception thrown on broker side: + * <p> + * {@code javax.jms.JMSException: Could not correlate acknowledgment with + * dispatched message: MessageAck} + * + * @author daroo + */ +public class AMQ2489Test extends TestSupport { + private final static String SEQ_NUM_PROPERTY = "seqNum"; + + private final static int TOTAL_MESSAGES_CNT = 2; + private final static int CONSUMERS_CNT = 2; + + private final CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT); + + private Connection connection; + + protected void setUp() throws Exception { + super.setUp(); + connection = createConnection(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + public void testUnorderedClientAcknowledge() throws Exception { + doUnorderedAck(Session.CLIENT_ACKNOWLEDGE); + } + + public void testUnorderedIndividualAcknowledge() throws Exception { + doUnorderedAck(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + } + + /** + * Main test method + * + * @param acknowledgmentMode + * - ACK mode to be used by consumers + * @throws Exception + */ + protected void doUnorderedAck(int acknowledgmentMode) throws Exception { + List<Consumer> consumers = null; + Session producerSession = null; + + connection.start(); + // Because exception is thrown on broker side only, let's set up + // exception listener to get it + final TestExceptionListener exceptionListener = new TestExceptionListener(); + connection.setExceptionListener(exceptionListener); + try { + consumers = new ArrayList<Consumer>(); + // start customers + for (int i = 0; i < CONSUMERS_CNT; i++) { + consumers.add(new Consumer(acknowledgmentMode)); + } + + // produce few test messages + producerSession = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = producerSession + .createProducer(new ActiveMQQueue(getQueueName())); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < TOTAL_MESSAGES_CNT; i++) { + final Message message = producerSession + .createTextMessage("test"); + // assign each message sequence number + message.setIntProperty(SEQ_NUM_PROPERTY, i); + producer.send(message); + } + + // during each onMessage() calls consumers decreases the LATCH + // counter. + // + // so, let's wait till all messages are consumed. + // + LATCH.await(); + + // wait a bit more to give exception listener a chance be populated + // with + // broker's error + TimeUnit.SECONDS.sleep(1); + + assertFalse(exceptionListener.getStatusText(), exceptionListener.hasExceptions()); + + } finally { + if (producerSession != null) + producerSession.close(); + + if (consumers != null) { + for (Consumer c : consumers) { + c.close(); + } + } + } + } + + protected String getQueueName() { + return getClass().getName() + "." + getName(); + } + + public final class Consumer implements MessageListener { + final Session session; + + private Consumer(int acknowledgmentMode) { + try { + session = connection.createSession(false, acknowledgmentMode); + final Queue queue = session.createQueue(getQueueName() + + "?consumer.prefetchSize=1"); + final MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(this); + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public void onMessage(Message message) { + try { + // retrieve sequence number assigned by producer... + final int seqNum = message.getIntProperty(SEQ_NUM_PROPERTY); + + // ...and let's delay every second message a little bit before + // acknowledgment + if ((seqNum % 2) == 0) { + System.out.println("Delayed message sequence numeber: " + + seqNum); + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + message.acknowledge(); + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + // decrease LATCH counter in the main test method. + LATCH.countDown(); + } + } + + private void close() { + if (session != null) { + try { + session.close(); + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + } + + public final class TestExceptionListener implements ExceptionListener { + private final java.util.Queue<Exception> exceptions = new ConcurrentLinkedQueue<Exception>(); + + public void onException(JMSException e) { + exceptions.add(e); + } + + public boolean hasExceptions() { + return exceptions.isEmpty() == false; + } + + public String getStatusText() { + final StringBuilder str = new StringBuilder(); + str.append("Exceptions count on broker side: " + exceptions.size() + + ".\nMessages:\n"); + for (Exception e : exceptions) { + str.append(e.getMessage() + "\n\n"); + } + return str.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java new file mode 100644 index 0000000..669066e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2512Test.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.kahadb.KahaDBStore; +import org.apache.activemq.util.IOHelper; + +public class AMQ2512Test extends EmbeddedBrokerTestSupport { + private static Connection connection; + private final static String QUEUE_NAME = "dee.q"; + private final static int INITIAL_MESSAGES_CNT = 1000; + private final static int WORKER_INTERNAL_ITERATIONS = 100; + private final static int TOTAL_MESSAGES_CNT = INITIAL_MESSAGES_CNT * WORKER_INTERNAL_ITERATIONS + + INITIAL_MESSAGES_CNT; + private final static byte[] payload = new byte[5 * 1024]; + private final static String TEXT = new String(payload); + + private final static String PRP_INITIAL_ID = "initial-id"; + private final static String PRP_WORKER_ID = "worker-id"; + + private final static CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT); + + private final static AtomicInteger ON_MSG_COUNTER = new AtomicInteger(); + + public void testKahaDBFailure() throws Exception { + final ConnectionFactory fac = new ActiveMQConnectionFactory(this.bindAddress); + connection = fac.createConnection(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue(QUEUE_NAME); + final MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + + final long startTime = System.nanoTime(); + + final List<Consumer> consumers = new ArrayList<Consumer>(); + for (int i = 0; i < 20; i++) { + consumers.add(new Consumer("worker-" + i)); + } + + for (int i = 0; i < INITIAL_MESSAGES_CNT; i++) { + final TextMessage msg = session.createTextMessage(TEXT); + msg.setStringProperty(PRP_INITIAL_ID, "initial-" + i); + producer.send(msg); + } + + LATCH.await(); + final long endTime = System.nanoTime(); + System.out.println("Total execution time = " + + TimeUnit.MILLISECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [ms]."); + System.out.println("Rate = " + TOTAL_MESSAGES_CNT + / TimeUnit.SECONDS.convert(endTime - startTime, TimeUnit.NANOSECONDS) + " [msg/s]."); + + for (Consumer c : consumers) { + c.close(); + } + connection.close(); + } + + private final static class Consumer implements MessageListener { + private final String name; + private final Session session; + private final MessageProducer producer; + + private Consumer(String name) { + this.name = name; + try { + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final Queue queue = session.createQueue(QUEUE_NAME + "?consumer.prefetchSize=10"); + producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + final MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(this); + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public void onMessage(Message message) { + final TextMessage msg = (TextMessage) message; + try { + if (!msg.propertyExists(PRP_WORKER_ID)) { + for (int i = 0; i < WORKER_INTERNAL_ITERATIONS; i++) { + final TextMessage newMsg = session.createTextMessage(msg.getText()); + newMsg.setStringProperty(PRP_WORKER_ID, name + "-" + i); + newMsg.setStringProperty(PRP_INITIAL_ID, msg.getStringProperty(PRP_INITIAL_ID)); + producer.send(newMsg); + } + } + msg.acknowledge(); + + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } finally { + final int onMsgCounter = ON_MSG_COUNTER.getAndIncrement(); + if (onMsgCounter % 1000 == 0) { + System.out.println("message received: " + onMsgCounter); + } + LATCH.countDown(); + } + } + + private void close() { + if (session != null) { + try { + session.close(); + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + } + } + + @Override + protected void setUp() throws Exception { + bindAddress = "tcp://0.0.0.0:61617"; + super.setUp(); + } + + @Override + protected BrokerService createBroker() throws Exception { + File dataFileDir = new File("target/test-amq-2512/datadb"); + IOHelper.mkdirs(dataFileDir); + IOHelper.deleteChildren(dataFileDir); + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(dataFileDir); + BrokerService answer = new BrokerService(); + answer.setPersistenceAdapter(kaha); + + kaha.setEnableJournalDiskSyncs(false); + //kaha.setIndexCacheSize(10); + answer.setDataDirectoryFile(dataFileDir); + answer.setUseJmx(false); + answer.addConnector(bindAddress); + return answer; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java new file mode 100644 index 0000000..b9cfbd9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2513Test.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.jmx.ManagementContext; + +/** + * This unit test verifies an issue when + * javax.management.InstanceNotFoundException is thrown after subsequent startups when + * managementContext createConnector="false" + * + */ +public class AMQ2513Test extends TestCase { + + private BrokerService broker; + private String connectionUri; + + void createBroker(boolean deleteAllMessagesOnStartup) throws Exception { + broker = new BrokerService(); + broker.setBrokerName("localhost"); + broker.setUseJmx(true); + broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); + broker.addConnector("tcp://localhost:0"); + + ManagementContext ctx = new ManagementContext(); + //if createConnector == true everything is fine + ctx.setCreateConnector(false); + broker.setManagementContext(ctx); + + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + } + + public void testJmx() throws Exception{ + createBroker(true); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue("test")); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + + producer.send(session.createTextMessage("test123")); + + DestinationViewMBean dv = createView(); + assertTrue(dv.getQueueSize() > 0); + + connection.close(); + + broker.stop(); + broker.waitUntilStopped(); + + createBroker(false); + factory = new ActiveMQConnectionFactory(connectionUri); + connection = factory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(session.createQueue("test")); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + producer.send(session.createTextMessage("test123")); + connection.close(); + + dv = createView(); + assertTrue(dv.getQueueSize() > 0); + + broker.stop(); + broker.waitUntilStopped(); + + } + + DestinationViewMBean createView() throws Exception { + String domain = "org.apache.activemq"; + ObjectName name = new ObjectName(domain + ":type=Broker,brokerName=localhost," + + "destinationType=Queue,destinationName=test"); + return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, + true); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java new file mode 100644 index 0000000..80c036f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2528Test.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.region.Queue; +import org.junit.Assert; + +/** + * This test demonstrates a bug in which calling + * Queue#removeMatchingMessages("") generates an exception, whereas the JMS + * specification states that an empty selector is valid. + */ +public class AMQ2528Test extends EmbeddedBrokerTestSupport { + + /** + * Setup the test so that the destination is a queue. + */ + protected void setUp() throws Exception { + useTopic = false; + super.setUp(); + } + + /** + * This test enqueues test messages to destination and then verifies that + * {@link Queue#removeMatchingMessages("")} removes all the messages. + */ + public void testRemoveMatchingMessages() throws Exception { + final int NUM_MESSAGES = 100; + final String MESSAGE_ID = "id"; + + // Enqueue the test messages. + Connection conn = createConnection(); + try { + conn.start(); + Session session = conn.createSession(false, + Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + for (int id = 0; id < NUM_MESSAGES; id++) { + Message message = session.createMessage(); + message.setIntProperty(MESSAGE_ID, id); + producer.send(message); + } + producer.close(); + session.close(); + } finally { + conn.close(); + } + + // Verify that half of the messages can be removed by selector. + Queue queue = (Queue) broker.getRegionBroker().getDestinations( + destination).iterator().next(); + + Assert.assertEquals(NUM_MESSAGES / 2, queue + .removeMatchingMessages(MESSAGE_ID + " < " + NUM_MESSAGES / 2)); + + // Verify that the remainder of the messages can be removed by empty + // selector. + Assert.assertEquals(NUM_MESSAGES - NUM_MESSAGES / 2, queue + .removeMatchingMessages("")); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java new file mode 100644 index 0000000..533ae0c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2571Test.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; + +public class AMQ2571Test extends EmbeddedBrokerTestSupport { + + public void testTempQueueClosing() { + try { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.bindAddress); + connectionFactory.setAlwaysSyncSend(true); + + // First create session that will own the TempQueue + Connection connectionA = connectionFactory.createConnection(); + connectionA.start(); + + Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE); + + TemporaryQueue tempQueue = sessionA.createTemporaryQueue(); + + // Next, create session that will put messages on the queue. + Connection connectionB = connectionFactory.createConnection(); + connectionB.start(); + + Session sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a producer for connection B. + final MessageProducer producerB = sessionB.createProducer(tempQueue); + producerB.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + final TextMessage message = sessionB.createTextMessage("Testing AMQ TempQueue."); + + Thread sendingThread = new Thread(new Runnable() { + public void run() { + try { + long end = System.currentTimeMillis() + 5*60*1000; + // wait for exception on send + while (System.currentTimeMillis() < end) { + producerB.send(message); + } + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + + // Send 5000 messages. + sendingThread.start(); + // Now close connection A. This will remove the TempQueue. + connectionA.close(); + // Wait for the thread to finish. + sendingThread.join(5*60*1000); + + // Sleep for a while to make sure that we should know that the + // TempQueue is gone. + //Thread.sleep(50); + + // Now we test if we are able to send again. + try { + producerB.send(message); + fail("Involuntary recreated temporary queue."); + } catch (JMSException e) { + // Got exception, just as we wanted because the creator of + // the TempQueue had closed the connection prior to the send. + assertTrue("TempQueue does not exist anymore.", true); + } + } catch (Exception e) { + fail("Unexpected exception " + e); + } + } + + @Override + protected void setUp() throws Exception { + bindAddress = "vm://localhost"; + setAutoFail(true); + super.setUp(); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setPersistent(false); + answer.setUseJmx(false); + return answer; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java new file mode 100644 index 0000000..2bcb983 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2580Test.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicSession; + +public class AMQ2580Test extends TestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ2580Test.class); + + private static final String TOPIC_NAME = "topicName"; + private static final String CLIENT_ID = "client_id"; + private static final String textOfSelectedMsg = "good_message"; + + protected TopicConnection connection; + + private Topic topic; + private Session session; + private MessageProducer producer; + private ConnectionFactory connectionFactory; + private BrokerService service; + + public static Test suite() { + return suite(AMQ2580Test.class); + } + + protected void setUp() throws Exception { + super.setUp(); + initDurableBroker(); + initConnectionFactory(); + initTopic(); + } + + protected void tearDown() throws Exception { + shutdownClient(); + service.stop(); + super.tearDown(); + } + + private void initConnection() throws JMSException { + if (connection == null) { + LOG.info("Initializing connection"); + + connection = (TopicConnection) connectionFactory.createConnection(); + connection.start(); + } + } + + public void initCombosForTestTopicIsDurableSmokeTest() throws Exception { + addCombinationValues("defaultPersistenceAdapter", PersistenceAdapterChoice.values()); + } + + public void testTopicIsDurableSmokeTest() throws Exception { + + initClient(); + MessageConsumer consumer = createMessageConsumer(); + LOG.info("Consuming message"); + assertNull(consumer.receive(1)); + shutdownClient(); + consumer.close(); + + sendMessages(); + shutdownClient(); + + initClient(); + consumer = createMessageConsumer(); + + LOG.info("Consuming message"); + TextMessage answer1 = (TextMessage) consumer.receive(1000); + assertNotNull("we got our message", answer1); + + consumer.close(); + } + + private MessageConsumer createMessageConsumer() throws JMSException { + LOG.info("creating durable subscriber"); + return session.createDurableSubscriber(topic, + TOPIC_NAME, + "name='value'", + false); + } + + private void initClient() throws JMSException { + LOG.info("Initializing client"); + + initConnection(); + initSession(); + } + + private void shutdownClient() + throws JMSException { + LOG.info("Closing session and connection"); + session.close(); + connection.close(); + session = null; + connection = null; + } + + private void sendMessages() + throws JMSException { + initConnection(); + + initSession(); + + LOG.info("Creating producer"); + producer = session.createProducer(topic); + + sendMessageThatFailsSelection(); + + sendMessage(textOfSelectedMsg, "value"); + } + + private void initSession() throws JMSException { + LOG.info("Initializing session"); + session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + } + + private void sendMessageThatFailsSelection() throws JMSException { + for (int i = 0; i < 5; i++) { + String textOfNotSelectedMsg = "Msg_" + i; + sendMessage(textOfNotSelectedMsg, "not_value"); + LOG.info("#"); + } + } + + private void sendMessage( + String msgText, + String propertyValue) throws JMSException { + LOG.info("Creating message: " + msgText); + TextMessage messageToSelect = session.createTextMessage(msgText); + messageToSelect.setStringProperty("name", propertyValue); + LOG.info("Sending message"); + producer.send(messageToSelect); + } + + protected void initConnectionFactory() throws Exception { + ActiveMQConnectionFactory activeMqConnectionFactory = createActiveMqConnectionFactory(); + connectionFactory = activeMqConnectionFactory; + } + + + private ActiveMQConnectionFactory createActiveMqConnectionFactory() throws Exception { + ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory( + "failover:" + service.getTransportConnectors().get(0).getConnectUri().toString()); + activeMqConnectionFactory.setWatchTopicAdvisories(false); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setDurableTopicPrefetch(2); + prefetchPolicy.setOptimizeDurableTopicPrefetch(2); + activeMqConnectionFactory.setPrefetchPolicy(prefetchPolicy); + activeMqConnectionFactory.setClientID(CLIENT_ID); + return activeMqConnectionFactory; + } + + private void initDurableBroker() throws Exception { + service = new BrokerService(); + setDefaultPersistenceAdapter(service); + service.setDeleteAllMessagesOnStartup(true); + service.setAdvisorySupport(false); + service.setTransportConnectorURIs(new String[]{"tcp://localhost:0"}); + service.setPersistent(true); + service.setUseJmx(false); + service.start(); + + } + + private void initTopic() throws JMSException { + initConnection(); + TopicSession topicSession = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + topic = topicSession.createTopic(TOPIC_NAME); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java new file mode 100644 index 0000000..3e41dc9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584ConcurrentDlqTest.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.io.FilenameFilter; +import java.util.Arrays; +import java.util.Properties; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TopicSubscriber; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.IntrospectionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// variation on AMQ2584 where the DLQ consumer works in parallel to producer so +// that some dups are not suppressed as they are already acked by the consumer +// the audit needs to be disabled to allow these dupes to be consumed +public class AMQ2584ConcurrentDlqTest extends org.apache.activemq.TestSupport { + + static final Logger LOG = LoggerFactory.getLogger(AMQ2584ConcurrentDlqTest.class); + BrokerService broker = null; + ActiveMQTopic topic; + + ActiveMQConnection consumerConnection = null, producerConnection = null, dlqConnection = null; + Session consumerSession; + Session producerSession; + MessageProducer producer; + Vector<TopicSubscriber> duralbeSubs = new Vector<TopicSubscriber>(); + final int numMessages = 1000; + final int numDurableSubs = 2; + + String data; + private long dlqConsumerLastReceivedTimeStamp; + private AtomicLong dlqReceivedCount = new AtomicLong(0); + + // 2 deliveries of each message, 3 producers + CountDownLatch redeliveryConsumerLatch = new CountDownLatch(((2 * numMessages) * numDurableSubs) - 1); + // should get at least numMessages, possibly more + CountDownLatch dlqConsumerLatch = new CountDownLatch((numMessages - 1)); + + public void testSize() throws Exception { + openConsumer(redeliveryConsumerLatch); + openDlqConsumer(dlqConsumerLatch); + + + assertEquals(0, broker.getAdminView().getStorePercentUsage()); + + for (int i = 0; i < numMessages; i++) { + sendMessage(false); + } + + final BrokerView brokerView = broker.getAdminView(); + + broker.getSystemUsage().getStoreUsage().isFull(); + LOG.info("store percent usage: " + brokerView.getStorePercentUsage()); + assertTrue("redelivery consumer got all it needs, remaining: " + + redeliveryConsumerLatch.getCount(), redeliveryConsumerLatch.await(60, TimeUnit.SECONDS)); + assertTrue("dql consumer got all it needs", dlqConsumerLatch.await(60, TimeUnit.SECONDS)); + closeConsumer(); + + LOG.info("Giving dlq a chance to clear down once topic consumer is closed"); + + // consumer all of the duplicates that arrived after the first ack + closeDlqConsumer(); + + //get broker a chance to clean obsolete messages, wait 2*cleanupInterval + Thread.sleep(5000); + + FilenameFilter justLogFiles = new FilenameFilter() { + public boolean accept(File file, String s) { + return s.endsWith(".log"); + } + }; + int numFiles = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getDirectory().list(justLogFiles).length; + if (numFiles > 2) { + LOG.info(Arrays.toString(((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getDirectory().list(justLogFiles))); + } + LOG.info("num files: " + numFiles); + assertEquals("kahaDB dir should contain 1 db file,is: " + numFiles, 1, numFiles); + } + + private void openConsumer(final CountDownLatch latch) throws Exception { + consumerConnection = (ActiveMQConnection) createConnection(); + consumerConnection.setClientID("cliID"); + consumerConnection.start(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageListener listener = new MessageListener() { + public void onMessage(Message message) { + latch.countDown(); + try { + consumerSession.recover(); + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + }; + + for (int i = 1; i <= numDurableSubs; i++) { + TopicSubscriber sub = consumerSession.createDurableSubscriber(topic, "subName" + i); + sub.setMessageListener(listener); + duralbeSubs.add(sub); + } + } + + private void openDlqConsumer(final CountDownLatch received) throws Exception { + + dlqConnection = (ActiveMQConnection) createConnection(); + Session dlqSession = dlqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + dlqConsumer.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + if (received.getCount() > 0 && received.getCount() % 200 == 0) { + LOG.info("remaining on DLQ: " + received.getCount()); + } + received.countDown(); + dlqConsumerLastReceivedTimeStamp = System.currentTimeMillis(); + dlqReceivedCount.incrementAndGet(); + } + }); + dlqConnection.start(); + } + + + private void closeConsumer() throws JMSException { + for (TopicSubscriber sub : duralbeSubs) { + sub.close(); + } + if (consumerSession != null) { + for (int i = 1; i <= numDurableSubs; i++) { + consumerSession.unsubscribe("subName" + i); + } + } + if (consumerConnection != null) { + consumerConnection.close(); + consumerConnection = null; + } + } + + private void closeDlqConsumer() throws JMSException, InterruptedException { + final long limit = System.currentTimeMillis() + 30 * 1000; + if (dlqConsumerLastReceivedTimeStamp > 0) { + while (System.currentTimeMillis() < dlqConsumerLastReceivedTimeStamp + 5000 + && System.currentTimeMillis() < limit) { + LOG.info("waiting for DLQ do drain, receivedCount: " + dlqReceivedCount); + TimeUnit.SECONDS.sleep(1); + } + } + if (dlqConnection != null) { + dlqConnection.close(); + dlqConnection = null; + } + } + + private void sendMessage(boolean filter) throws Exception { + if (producerConnection == null) { + producerConnection = (ActiveMQConnection) createConnection(); + producerConnection.start(); + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = producerSession.createProducer(topic); + } + + Message message = producerSession.createMessage(); + message.setStringProperty("data", data); + producer.send(message); + } + + private void startBroker(boolean deleteMessages) throws Exception { + broker = new BrokerService(); + broker.setAdvisorySupport(false); + broker.setBrokerName("testStoreSize"); + + PolicyMap map = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setEnableAudit(false); + map.setDefaultEntry(entry); + broker.setDestinationPolicy(map); + + if (deleteMessages) { + broker.setDeleteAllMessagesOnStartup(true); + } + configurePersistenceAdapter(broker.getPersistenceAdapter()); + broker.getSystemUsage().getStoreUsage().setLimit(200 * 1000 * 1000); + broker.start(); + } + + private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) { + Properties properties = new Properties(); + String maxFileLengthVal = String.valueOf(2 * 1024 * 1024); + properties.put("journalMaxFileLength", maxFileLengthVal); + properties.put("maxFileLength", maxFileLengthVal); + properties.put("cleanupInterval", "2000"); + properties.put("checkpointInterval", "2000"); + // there are problems with duplicate dispatch in the cursor, which maintain + // a map of messages. A dup dispatch can be dropped. + // see: org.apache.activemq.broker.region.cursors.OrderedPendingList + // Adding duplicate detection to the default DLQ strategy removes the problem + // which means we can leave the default for concurrent store and dispatch q + //properties.put("concurrentStoreAndDispatchQueues", "false"); + + IntrospectionSupport.setProperties(persistenceAdapter, properties); + } + + private void stopBroker() throws Exception { + if (broker != null) + broker.stop(); + broker = null; + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0&waitForStart=5000&create=false"); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + StringBuilder sb = new StringBuilder(5000); + for (int i = 0; i < 5000; i++) { + sb.append('a'); + } + data = sb.toString(); + + startBroker(true); + topic = (ActiveMQTopic) createDestination(); + } + + @Override + protected void tearDown() throws Exception { + stopBroker(); + super.tearDown(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java new file mode 100644 index 0000000..b84b9ab --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2584Test.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(value = Parameterized.class) +public class AMQ2584Test extends org.apache.activemq.TestSupport { + + static final Logger LOG = LoggerFactory.getLogger(AMQ2584Test.class); + BrokerService broker = null; + ActiveMQTopic topic; + + ActiveMQConnection consumerConnection = null, producerConnection = null; + Session producerSession; + MessageProducer producer; + final int minPercentUsageForStore = 3; + String data; + + private final TestSupport.PersistenceAdapterChoice persistenceAdapterChoice; + + @Parameterized.Parameters(name="{0}") + public static Collection<TestSupport.PersistenceAdapterChoice[]> getTestParameters() { + TestSupport.PersistenceAdapterChoice[] kahaDb = {TestSupport.PersistenceAdapterChoice.KahaDB}; + TestSupport.PersistenceAdapterChoice[] levelDb = {TestSupport.PersistenceAdapterChoice.LevelDB}; + List<TestSupport.PersistenceAdapterChoice[]> choices = new ArrayList<TestSupport.PersistenceAdapterChoice[]>(); + choices.add(kahaDb); + choices.add(levelDb); + + return choices; + } + + public AMQ2584Test(TestSupport.PersistenceAdapterChoice choice) { + this.persistenceAdapterChoice = choice; + } + + @Test(timeout = 120000) + public void testSize() throws Exception { + int messages = 1000; + CountDownLatch redeliveryConsumerLatch = new CountDownLatch((messages*3)); + openConsumer(redeliveryConsumerLatch); + + assertEquals(0, broker.getAdminView().getStorePercentUsage()); + + for (int i = 0; i < messages; i++) { + sendMessage(false); + } + + final BrokerView brokerView = broker.getAdminView(); + + broker.getSystemUsage().getStoreUsage().isFull(); + LOG.info("store percent usage: "+brokerView.getStorePercentUsage()); + int storePercentUsage = broker.getAdminView().getStorePercentUsage(); + assertTrue("some store in use", storePercentUsage > minPercentUsageForStore); + + assertTrue("redelivery consumer got all it needs", redeliveryConsumerLatch.await(60, TimeUnit.SECONDS)); + closeConsumer(); + + // consume from DLQ + final CountDownLatch received = new CountDownLatch(messages); + consumerConnection = (ActiveMQConnection) createConnection(); + Session dlqSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + dlqConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + if (received.getCount() % 500 == 0) { + LOG.info("remaining on DLQ: " + received.getCount()); + } + received.countDown(); + } + }); + consumerConnection.start(); + + assertTrue("Not all messages reached the DLQ", received.await(60, TimeUnit.SECONDS)); + + assertTrue("Store usage exceeds expected usage", + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + broker.getSystemUsage().getStoreUsage().isFull(); + LOG.info("store precent usage: "+brokerView.getStorePercentUsage()); + return broker.getAdminView().getStorePercentUsage() < minPercentUsageForStore; + } + })); + + closeConsumer(); + + } + + private void openConsumer(final CountDownLatch latch) throws Exception { + consumerConnection = (ActiveMQConnection) createConnection(); + consumerConnection.setClientID("cliID"); + consumerConnection.start(); + final Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageListener listener = new MessageListener() { + @Override + public void onMessage(Message message) { + latch.countDown(); + try { + session.recover(); + } catch (Exception ignored) { + ignored.printStackTrace(); + } + + } + }; + + session.createDurableSubscriber(topic, "subName1").setMessageListener(listener); + session.createDurableSubscriber(topic, "subName2").setMessageListener(listener); + session.createDurableSubscriber(topic, "subName3").setMessageListener(listener); + } + + private void closeConsumer() throws JMSException { + if (consumerConnection != null) + consumerConnection.close(); + consumerConnection = null; + } + + private void sendMessage(boolean filter) throws Exception { + if (producerConnection == null) { + producerConnection = (ActiveMQConnection) createConnection(); + producerConnection.start(); + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = producerSession.createProducer(topic); + } + + Message message = producerSession.createMessage(); + message.setStringProperty("data", data); + producer.send(message); + } + + private void startBroker(boolean deleteMessages) throws Exception { + broker = new BrokerService(); + broker.setAdvisorySupport(false); + broker.setBrokerName("testStoreSize"); + + if (deleteMessages) { + broker.setDeleteAllMessagesOnStartup(true); + } + LOG.info("Starting broker with persistenceAdapterChoice " + persistenceAdapterChoice.toString()); + setPersistenceAdapter(broker, persistenceAdapterChoice); + configurePersistenceAdapter(broker.getPersistenceAdapter()); + broker.getSystemUsage().getStoreUsage().setLimit(200 * 1000 * 1000); + broker.start(); + } + + private void configurePersistenceAdapter(PersistenceAdapter persistenceAdapter) { + Properties properties = new Properties(); + String maxFileLengthVal = String.valueOf(1 * 1024 * 1024); + properties.put("journalMaxFileLength", maxFileLengthVal); + properties.put("maxFileLength", maxFileLengthVal); + properties.put("cleanupInterval", "2000"); + properties.put("checkpointInterval", "2000"); + + IntrospectionSupport.setProperties(persistenceAdapter, properties); + } + + private void stopBroker() throws Exception { + if (broker != null) + broker.stop(); + broker = null; + } + + @Override + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&jms.redeliveryPolicy.maximumRedeliveries=0&jms.closeTimeout=60000&waitForStart=5000&create=false"); + } + + @Override + @Before + public void setUp() throws Exception { + StringBuilder sb = new StringBuilder(5000); + for (int i = 0; i < 5000; i++) { + sb.append('a'); + } + data = sb.toString(); + + startBroker(true); + topic = (ActiveMQTopic) createDestination(); + } + + @Override + @After + public void tearDown() throws Exception { + stopBroker(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java new file mode 100644 index 0000000..3f515d9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2585Test.java @@ -0,0 +1,83 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.EmbeddedBrokerAndConnectionTestSupport; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.Message; +import org.apache.activemq.spring.ConsumerBean; + +public class AMQ2585Test extends EmbeddedBrokerAndConnectionTestSupport { + private final Destination destination = new ActiveMQQueue("MyQueue"); + final static String LENGTH10STRING = "1234567890"; + private Session session; + private MessageProducer producer; + private ConsumerBean messageList; + + public void testOneMessageWithProperties() throws Exception { + TextMessage message = session.createTextMessage(LENGTH10STRING); + message.setStringProperty(LENGTH10STRING, LENGTH10STRING); + producer.send(message); + + messageList.assertMessagesArrived(1); + + ActiveMQTextMessage received = ((ActiveMQTextMessage) messageList + .flushMessages().get(0)); + + assertEquals(LENGTH10STRING, received.getText()); + assertTrue(received.getProperties().size() > 0); + assertTrue(received.propertyExists(LENGTH10STRING)); + assertEquals(LENGTH10STRING, received.getStringProperty(LENGTH10STRING)); + + /** + * As specified by getSize(), the size (memory usage) of the body should + * be length of text * 2. Unsure of how memory usage is calculated for + * properties, but should probably not be less than the sum of (string) + * lengths for the key name and value. + */ + + final int sizeShouldBeNoLessThan = LENGTH10STRING.length() * 4 + Message.DEFAULT_MINIMUM_MESSAGE_SIZE; + assertTrue("Message size was smaller than expected: " + received.getSize(), + received.getSize() >= sizeShouldBeNoLessThan); + assertFalse(LENGTH10STRING.length() * 2 == received.getSize()); + } + + @Override + protected void setUp() throws Exception { + bindAddress = bindAddress + "?marshal=true"; + super.setUp(); + messageList = new ConsumerBean(); + messageList.setVerbose(true); + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer messageConsumer = session.createConsumer(destination); + + messageConsumer.setMessageListener(messageList); + + producer = session.createProducer(destination); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java new file mode 100644 index 0000000..4f6f168 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.io.File; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.IOHelper; + +public class AMQ2616Test extends TestCase { + private static final int NUMBER = 2000; + private BrokerService brokerService; + private final ArrayList<Thread> threads = new ArrayList<Thread>(); + private final String ACTIVEMQ_BROKER_BIND = "tcp://0.0.0.0:0"; + private final AtomicBoolean shutdown = new AtomicBoolean(); + + private String connectionUri; + + public void testQueueResourcesReleased() throws Exception{ + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(connectionUri); + Connection tempConnection = fac.createConnection(); + tempConnection.start(); + Session tempSession = tempConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue tempQueue = tempSession.createTemporaryQueue(); + + Connection testConnection = fac.createConnection(); + long startUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); + Session testSession = testConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer testProducer = testSession.createProducer(tempQueue); + byte[] payload = new byte[1024*4]; + for (int i = 0; i < NUMBER; i++ ) { + BytesMessage msg = testSession.createBytesMessage(); + msg.writeBytes(payload); + testProducer.send(msg); + } + long endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); + assertFalse(startUsage==endUsage); + tempConnection.close(); + Thread.sleep(1000); + endUsage = brokerService.getSystemUsage().getMemoryUsage().getUsage(); + assertEquals(startUsage,endUsage); + } + + + @Override + protected void setUp() throws Exception { + // Start an embedded broker up. + brokerService = new BrokerService(); + + KahaDBPersistenceAdapter adaptor = new KahaDBPersistenceAdapter(); + adaptor.setEnableJournalDiskSyncs(false); + File file = new File("target/AMQ2616Test"); + IOHelper.mkdirs(file); + IOHelper.deleteChildren(file); + adaptor.setDirectory(file); + brokerService.setPersistenceAdapter(adaptor); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry pe = new PolicyEntry(); + pe.setMemoryLimit(10 * 1024 * 1024); + pe.setOptimizedDispatch(true); + pe.setProducerFlowControl(false); + pe.setExpireMessagesPeriod(1000); + pe.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); + policyMap.put(new ActiveMQQueue(">"), pe); + brokerService.setDestinationPolicy(policyMap); + brokerService.getSystemUsage().getMemoryUsage().setLimit(20 * 1024 * 1024); + brokerService.getSystemUsage().getTempUsage().setLimit(200 * 1024 * 1024); + brokerService.addConnector(ACTIVEMQ_BROKER_BIND); + brokerService.start(); + brokerService.waitUntilStarted(); + + connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); + + new ActiveMQQueue(getName()); + } + + @Override + protected void tearDown() throws Exception { + // Stop any running threads. + shutdown.set(true); + for (Thread t : threads) { + t.interrupt(); + t.join(); + } + brokerService.stop(); + } + +}
