http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
new file mode 100644
index 0000000..f7409dd
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266StarvedConsumerTest.java
@@ -0,0 +1,620 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+
+/*
+ * pause producers if consumers stall and verify broker drained before resume
+ */
+@RunWith(Parameterized.class)
+public class AMQ5266StarvedConsumerTest {
+    static Logger LOG = 
LoggerFactory.getLogger(AMQ5266StarvedConsumerTest.class);
+    String activemqURL;
+    BrokerService brokerService;
+
+    public int messageSize = 1000;
+
+    @Parameterized.Parameter(0)
+    public int publisherMessagesPerThread = 1000;
+
+    @Parameterized.Parameter(1)
+    public int publisherThreadCount = 20;
+
+    @Parameterized.Parameter(2)
+    public int consumerThreadsPerQueue = 5;
+
+    @Parameterized.Parameter(3)
+    public int destMemoryLimit = 50 * 1024;
+
+    @Parameterized.Parameter(4)
+    public boolean useCache = true;
+
+    @Parameterized.Parameter(5)
+    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = 
TestSupport.PersistenceAdapterChoice.KahaDB;
+
+    @Parameterized.Parameter(6)
+    public boolean optimizeDispatch = false;
+    private  AtomicBoolean didNotReceive = new AtomicBoolean(false);
+
+    
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}")
+    public static Iterable<Object[]> parameters() {
+        return Arrays.asList(new Object[][]{
+                {1000, 40,  5,   1024*1024,  false, 
TestSupport.PersistenceAdapterChoice.KahaDB, true},
+                {1000, 40,  5,   1024*1024,  false, 
TestSupport.PersistenceAdapterChoice.LevelDB, true},
+                {1000, 40,  5,   1024*1024,  false, 
TestSupport.PersistenceAdapterChoice.JDBC, true},
+
+                {500, 20,  20,   1024*1024,  false, 
TestSupport.PersistenceAdapterChoice.KahaDB, true},
+                {500, 20,  20,   1024*1024,  false, 
TestSupport.PersistenceAdapterChoice.LevelDB, true},
+                {500, 20,  20,   1024*1024,  false, 
TestSupport.PersistenceAdapterChoice.JDBC, true},
+        });
+    }
+
+    public int consumerBatchSize = 5;
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        TestSupport.setPersistenceAdapter(brokerService, 
persistenceAdapterChoice);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setUseJmx(false);
+        brokerService.setAdvisorySupport(false);
+
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setUseConsumerPriority(false); // 
java.lang.IllegalArgumentException: Comparison method violates its general 
contract!
+        defaultEntry.setMaxAuditDepth(publisherThreadCount);
+        defaultEntry.setEnableAudit(true);
+        defaultEntry.setUseCache(useCache);
+        defaultEntry.setMaxPageSize(1000);
+        defaultEntry.setOptimizedDispatch(optimizeDispatch);
+        defaultEntry.setMemoryLimit(destMemoryLimit);
+        defaultEntry.setExpireMessagesPeriod(0);
+        policyMap.setDefaultEntry(defaultEntry);
+        brokerService.setDestinationPolicy(policyMap);
+
+        brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 
1024);
+
+        TransportConnector transportConnector = 
brokerService.addConnector("tcp://0.0.0.0:0");
+        brokerService.start();
+        activemqURL = transportConnector.getPublishableConnectString();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    CyclicBarrier globalProducerHalt = new CyclicBarrier(publisherThreadCount, 
new Runnable() {
+        @Override
+        public void run() {
+            // wait for queue size to go to zero
+            try {
+                while 
(((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount()
 > 0) {
+                    LOG.info("Total messageCount: " + 
((RegionBroker)brokerService.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
+                    TimeUnit.SECONDS.sleep(5);
+                }
+            } catch (Exception ignored) {
+                ignored.printStackTrace();
+            }
+        }
+    });
+
+    @Test(timeout = 30 * 60 * 1000)
+    public void test() throws Exception {
+
+        String activemqQueues = 
"activemq,activemq2,activemq3,activemq4";//,activemq5,activemq6,activemq7,activemq8,activemq9";
+
+        int consumerWaitForConsumption = 5 * 60 * 1000;
+
+        ExportQueuePublisher publisher = null;
+        ExportQueueConsumer consumer = null;
+
+        LOG.info("Publisher will publish " + (publisherMessagesPerThread * 
publisherThreadCount) + " messages to each queue specified.");
+        LOG.info("\nBuilding Publisher...");
+
+        publisher = new ExportQueuePublisher(activemqURL, activemqQueues, 
publisherMessagesPerThread, publisherThreadCount);
+
+        LOG.info("Building Consumer...");
+
+        consumer = new ExportQueueConsumer(activemqURL, activemqQueues, 
consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * 
publisherThreadCount);
+
+
+        LOG.info("Starting Publisher...");
+
+        publisher.start();
+
+        LOG.info("Starting Consumer...");
+
+        consumer.start();
+
+        int distinctPublishedCount = 0;
+
+
+        LOG.info("Waiting For Publisher Completion...");
+
+        publisher.waitForCompletion();
+
+        List publishedIds = publisher.getIDs();
+        distinctPublishedCount = new TreeSet(publishedIds).size();
+
+        LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", 
Distinct IDs Published: " + distinctPublishedCount);
+
+
+        long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
+        while (!consumer.completed() && System.currentTimeMillis() < endWait) {
+            try {
+                int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
+                LOG.info("Waiting For Consumer Completion. Time left: " + secs 
+ " secs");
+                Thread.sleep(10000);
+            } catch (Exception e) {
+            }
+        }
+
+        LOG.info("\nConsumer Complete: " + consumer.completed() +", Shutting 
Down.");
+
+        consumer.shutdown();
+
+
+        LOG.info("Consumer Stats:");
+
+        for (Map.Entry<String, List<String>> entry : 
consumer.getIDs().entrySet()) {
+
+            List<String> idList = entry.getValue();
+
+            int distinctConsumed = new TreeSet<String>(idList).size();
+
+            StringBuilder sb = new StringBuilder();
+            sb.append("   Queue: " + entry.getKey() +
+                    " -> Total Messages Consumed: " + idList.size() +
+                    ", Distinct IDs Consumed: " + distinctConsumed);
+
+            int diff = distinctPublishedCount - distinctConsumed;
+            sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + 
" ) ");
+            LOG.info(sb.toString());
+
+            assertEquals("expect to get all messages!", 0, diff);
+
+        }
+    }
+
+    public class ExportQueuePublisher {
+
+        private final String amqUser = ActiveMQConnection.DEFAULT_USER;
+        private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
+        private ActiveMQConnectionFactory connectionFactory = null;
+        private String activemqURL = null;
+        private String activemqQueues = null;
+        // Collection of distinct IDs that the publisher has published.
+        // After a message is published, its UUID will be written to this list 
for tracking.
+        // This list of IDs (or distinct count) will be used to compare to the 
consumed list of IDs.
+        //private Set<String> ids = Collections.synchronizedSet(new 
TreeSet<String>());
+        private List<String> ids = Collections.synchronizedList(new 
ArrayList<String>());
+        private List<PublisherThread> threads;
+
+        public ExportQueuePublisher(String activemqURL, String activemqQueues, 
int messagesPerThread, int threadCount) throws Exception {
+
+            this.activemqURL = activemqURL;
+            this.activemqQueues = activemqQueues;
+
+            threads = new ArrayList<PublisherThread>();
+
+            // Build the threads and tell them how many messages to publish
+            for (int i = 0; i < threadCount; i++) {
+                PublisherThread pt = new PublisherThread(messagesPerThread);
+                threads.add(pt);
+            }
+        }
+
+        public List<String> getIDs() {
+            return ids;
+        }
+
+        // Kick off threads
+        public void start() throws Exception {
+
+            for (PublisherThread pt : threads) {
+                pt.start();
+            }
+        }
+
+        // Wait for threads to complete. They will complete once they've 
published all of their messages.
+        public void waitForCompletion() throws Exception {
+
+            for (PublisherThread pt : threads) {
+                pt.join();
+                pt.close();
+            }
+        }
+
+        private Session newSession(QueueConnection queueConnection) throws 
Exception {
+            return queueConnection.createSession(true, 
Session.SESSION_TRANSACTED);
+        }
+
+        private synchronized QueueConnection newQueueConnection() throws 
Exception {
+
+            if (connectionFactory == null) {
+                connectionFactory = new ActiveMQConnectionFactory(amqUser, 
amqPassword, activemqURL);
+                connectionFactory.setWatchTopicAdvisories(false);
+            }
+
+            // Set the redelivery count to -1 (infinite), or else messages 
will start dropping
+            // after the queue has had a certain number of failures (default 
is 6)
+            RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
+            policy.setMaximumRedeliveries(-1);
+
+            QueueConnection amqConnection = 
connectionFactory.createQueueConnection();
+            amqConnection.start();
+            return amqConnection;
+        }
+
+        private class PublisherThread extends Thread {
+
+            private int count;
+            private QueueConnection qc;
+            private Session session;
+            private MessageProducer mp;
+            private Queue q;
+
+            private PublisherThread(int count) throws Exception {
+
+                this.count = count;
+
+                // Each Thread has its own Connection and Session, so no sync 
worries
+                qc = newQueueConnection();
+                session = newSession(qc);
+
+                // In our code, when publishing to multiple queues,
+                // we're using composite destinations like below
+                q = new ActiveMQQueue(activemqQueues);
+                mp = session.createProducer(null);
+            }
+
+            public void run() {
+
+                try {
+
+                    // Loop until we've published enough messages
+                    while (count-- > 0) {
+
+                        TextMessage tm = 
session.createTextMessage(getMessageText());
+                        String id = UUID.randomUUID().toString();
+                        tm.setStringProperty("KEY", id);
+                        ids.add(id);                            // keep track 
of the key to compare against consumer
+
+                        mp.send(q, tm);
+                        session.commit();
+
+                        if (didNotReceive.get()) {
+                            globalProducerHalt.await();
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+            // Called by waitForCompletion
+            public void close() {
+
+                try {
+                    mp.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    session.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    qc.close();
+                } catch (Exception e) {
+                }
+            }
+        }
+
+    }
+
+    String messageText;
+    private String getMessageText() {
+
+        if (messageText == null) {
+
+            synchronized (this) {
+
+                if (messageText == null) {
+
+                    StringBuilder sb = new StringBuilder();
+                    for (int i = 0; i < messageSize; i++) {
+                        sb.append("X");
+                    }
+                    messageText = sb.toString();
+                }
+            }
+        }
+
+        return messageText;
+    }
+
+
+    public class ExportQueueConsumer {
+
+        private final String amqUser = ActiveMQConnection.DEFAULT_USER;
+        private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
+        private final int totalToExpect;
+        private ActiveMQConnectionFactory connectionFactory = null;
+        private String activemqURL = null;
+        private String activemqQueues = null;
+        private String[] queues = null;
+        // Map of IDs that were consumed, keyed by queue name.
+        // We'll compare these against what was published to know if any got 
stuck or dropped.
+        private Map<String, List<String>> idsByQueue = new HashMap<String, 
List<String>>();
+        private Map<String, List<ConsumerThread>> threads;
+
+        public ExportQueueConsumer(String activemqURL, String activemqQueues, 
int threadsPerQueue, int batchSize, int totalToExpect) throws Exception {
+
+            this.activemqURL = activemqURL;
+            this.activemqQueues = activemqQueues;
+            this.totalToExpect = totalToExpect;
+
+            queues = this.activemqQueues.split(",");
+
+            for (int i = 0; i < queues.length; i++) {
+                queues[i] = queues[i].trim();
+            }
+
+            threads = new HashMap<String, List<ConsumerThread>>();
+
+            // For each queue, create a list of threads and set up the list of 
ids
+            for (String q : queues) {
+
+                List<ConsumerThread> list = new ArrayList<ConsumerThread>();
+
+                idsByQueue.put(q, Collections.synchronizedList(new 
ArrayList<String>()));
+
+                for (int i = 0; i < threadsPerQueue; i++) {
+                    list.add(new ConsumerThread(q, batchSize));
+                }
+
+                threads.put(q, list);
+            }
+        }
+
+        public Map<String, List<String>> getIDs() {
+            return idsByQueue;
+        }
+
+        // Start the threads
+        public void start() throws Exception {
+
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    ct.start();
+                }
+            }
+        }
+
+        // Tell the threads to stop
+        // Then wait for them to stop
+        public void shutdown() throws Exception {
+
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    ct.shutdown();
+                }
+            }
+
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    ct.join();
+                }
+            }
+        }
+
+        private Session newSession(QueueConnection queueConnection) throws 
Exception {
+            return queueConnection.createSession(true, 
Session.SESSION_TRANSACTED);
+        }
+
+        private synchronized QueueConnection newQueueConnection() throws 
Exception {
+
+            if (connectionFactory == null) {
+                connectionFactory = new ActiveMQConnectionFactory(amqUser, 
amqPassword, activemqURL);
+                connectionFactory.setWatchTopicAdvisories(false);
+            }
+
+            // Set the redelivery count to -1 (infinite), or else messages 
will start dropping
+            // after the queue has had a certain number of failures (default 
is 6)
+            RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
+            policy.setMaximumRedeliveries(-1);
+
+            QueueConnection amqConnection = 
connectionFactory.createQueueConnection();
+            amqConnection.start();
+            return amqConnection;
+        }
+
+        public boolean completed() {
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    if (ct.isAlive()) {
+                        LOG.info("thread for {} is still alive.", ct.qName);
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+
+        private class ConsumerThread extends Thread {
+
+            private int batchSize;
+            private QueueConnection qc;
+            private Session session;
+            private MessageConsumer mc;
+            private List<String> idList;
+            private boolean shutdown = false;
+            private String qName;
+
+            private ConsumerThread(String queueName, int batchSize) throws 
Exception {
+
+                this.batchSize = batchSize;
+
+                // Each thread has its own connection and session
+                qName = queueName;
+                qc = newQueueConnection();
+                session = newSession(qc);
+                Queue q = session.createQueue(queueName + 
"?consumer.prefetchSize=" + batchSize);
+                mc = session.createConsumer(q);
+
+                idList = idsByQueue.get(queueName);
+            }
+
+            public void run() {
+
+                try {
+
+                    int count = 0;
+
+                    // Keep reading as long as it hasn't been told to shutdown
+                    while (!shutdown) {
+
+                        if (idList.size() >= totalToExpect) {
+                            LOG.info("Got {} for q: {}", +idList.size(), 
qName);
+                            session.commit();
+                            break;
+                        }
+                        Message m = mc.receive(4000);
+
+                        if (m != null) {
+
+                            // We received a non-null message, add the ID to 
our list
+
+                            idList.add(m.getStringProperty("KEY"));
+
+                            count++;
+
+                            // If we've reached our batch size, commit the 
batch and reset the count
+
+                            if (count == batchSize) {
+                                session.commit();
+                                count = 0;
+                            }
+                        } else {
+
+                            // We didn't receive anything this time, commit 
any current batch and reset the count
+
+                            session.commit();
+                            count = 0;
+
+                            // Sleep a little before trying to read after not 
getting a message
+
+                            try {
+                                if (idList.size() < totalToExpect) {
+                                    LOG.info("did not receive on {}, current 
count: {}", qName, idList.size());
+                                    didNotReceive.set(true);
+                                }
+                                //sleep(3000);
+                            } catch (Exception e) {
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+
+                    // Once we exit, close everything
+                    close();
+                }
+            }
+
+            public void shutdown() {
+                shutdown = true;
+            }
+
+            public void close() {
+
+                try {
+                    mc.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    session.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    qc.close();
+                } catch (Exception e) {
+
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
new file mode 100644
index 0000000..e180746
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266Test.java
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Stuck messages test client.
+ * <p/>
+ * Will kick of publisher and consumer simultaneously, and will usually result 
in stuck messages on the queue.
+ */
+@RunWith(Parameterized.class)
+public class AMQ5266Test {
+    static Logger LOG = LoggerFactory.getLogger(AMQ5266Test.class);
+    String activemqURL = "tcp://localhost:61617";
+    BrokerService brokerService;
+
+    public int messageSize = 1000;
+
+    @Parameterized.Parameter(0)
+    public int publisherMessagesPerThread = 1000;
+
+    @Parameterized.Parameter(1)
+    public int publisherThreadCount = 20;
+
+    @Parameterized.Parameter(2)
+    public int consumerThreadsPerQueue = 5;
+
+    @Parameterized.Parameter(3)
+    public int destMemoryLimit = 50 * 1024;
+
+    @Parameterized.Parameter(4)
+    public boolean useCache = true;
+
+    @Parameterized.Parameter(5)
+    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = 
TestSupport.PersistenceAdapterChoice.KahaDB;
+
+    @Parameterized.Parameter(6)
+    public boolean optimizeDispatch = false;
+
+    
@Parameterized.Parameters(name="#{0},producerThreads:{1},consumerThreads:{2},mL:{3},useCache:{4},store:{5},optimizedDispatch:{6}")
+    public static Iterable<Object[]> parameters() {
+        return Arrays.asList(new Object[][]{
+                {1,    1,   1,   50*1024,   false, 
TestSupport.PersistenceAdapterChoice.JDBC, true},
+                {1000, 20,  5,   50*1024,   true,  
TestSupport.PersistenceAdapterChoice.JDBC, false},
+                {100,  20,  5,   50*1024,   false, 
TestSupport.PersistenceAdapterChoice.JDBC, false},
+                {1000, 5,   20,  50*1024,   true,  
TestSupport.PersistenceAdapterChoice.JDBC, false},
+                {1000, 20,  20,  1024*1024, true,  
TestSupport.PersistenceAdapterChoice.JDBC, false},
+
+                {1,    1,   1,   50*1024,   false, 
TestSupport.PersistenceAdapterChoice.KahaDB, true},
+                {100,  5,   5,   50*1024,   false, 
TestSupport.PersistenceAdapterChoice.KahaDB, false},
+                {1000, 20,  5,   50*1024,   true,  
TestSupport.PersistenceAdapterChoice.KahaDB, false},
+                {100,  20,  5,   50*1024,   false, 
TestSupport.PersistenceAdapterChoice.KahaDB, false},
+                {1000, 5,   20,  50*1024,   true,  
TestSupport.PersistenceAdapterChoice.KahaDB, false},
+                {1000, 20,  20,  1024*1024, true,  
TestSupport.PersistenceAdapterChoice.KahaDB, false},
+
+                {1,    1,   1,   50*1024,   false, 
TestSupport.PersistenceAdapterChoice.LevelDB, true},
+                {100,  5,   5,   50*1024,   false, 
TestSupport.PersistenceAdapterChoice.LevelDB, false},
+                {1000, 20,  5,   50*1024,   true,  
TestSupport.PersistenceAdapterChoice.LevelDB, false},
+                {100,  20,  5,   50*1024,   false, 
TestSupport.PersistenceAdapterChoice.LevelDB, false},
+                {1000, 5,   20,  50*1024,   true,  
TestSupport.PersistenceAdapterChoice.LevelDB, false},
+                {1000, 20,  20,  1024*1024, true,  
TestSupport.PersistenceAdapterChoice.LevelDB, false},
+
+        });
+    }
+
+    public int consumerBatchSize = 5;
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        TestSupport.setPersistenceAdapter(brokerService, 
persistenceAdapterChoice);
+        brokerService.setDeleteAllMessagesOnStartup(true);
+        brokerService.setUseJmx(false);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setUseConsumerPriority(false); // 
java.lang.IllegalArgumentException: Comparison method violates its general 
contract!
+        defaultEntry.setMaxAuditDepth(publisherThreadCount);
+        defaultEntry.setEnableAudit(true);
+        defaultEntry.setUseCache(useCache);
+        defaultEntry.setMaxPageSize(1000);
+        defaultEntry.setOptimizedDispatch(optimizeDispatch);
+        defaultEntry.setMemoryLimit(destMemoryLimit);
+        defaultEntry.setExpireMessagesPeriod(0);
+        policyMap.setDefaultEntry(defaultEntry);
+        brokerService.setDestinationPolicy(policyMap);
+
+        brokerService.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 
1024);
+
+        TransportConnector transportConnector = 
brokerService.addConnector("tcp://0.0.0.0:0");
+        brokerService.start();
+        activemqURL = transportConnector.getPublishableConnectString();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        String activemqQueues = 
"activemq,activemq2";//,activemq3,activemq4,activemq5,activemq6,activemq7,activemq8,activemq9";
+
+        int consumerWaitForConsumption = 5 * 60 * 1000;
+
+        ExportQueuePublisher publisher = null;
+        ExportQueueConsumer consumer = null;
+
+        LOG.info("Publisher will publish " + (publisherMessagesPerThread * 
publisherThreadCount) + " messages to each queue specified.");
+        LOG.info("\nBuilding Publisher...");
+
+        publisher = new ExportQueuePublisher(activemqURL, activemqQueues, 
publisherMessagesPerThread, publisherThreadCount);
+
+        LOG.info("Building Consumer...");
+
+        consumer = new ExportQueueConsumer(activemqURL, activemqQueues, 
consumerThreadsPerQueue, consumerBatchSize, publisherMessagesPerThread * 
publisherThreadCount);
+
+
+        LOG.info("Starting Publisher...");
+
+        publisher.start();
+
+        LOG.info("Starting Consumer...");
+
+        consumer.start();
+
+        int distinctPublishedCount = 0;
+
+
+        LOG.info("Waiting For Publisher Completion...");
+
+        publisher.waitForCompletion();
+
+        List publishedIds = publisher.getIDs();
+        distinctPublishedCount = new TreeSet(publishedIds).size();
+
+        LOG.info("Publisher Complete. Published: " + publishedIds.size() + ", 
Distinct IDs Published: " + distinctPublishedCount);
+
+
+        long endWait = System.currentTimeMillis() + consumerWaitForConsumption;
+        while (!consumer.completed() && System.currentTimeMillis() < endWait) {
+            try {
+                int secs = (int) (endWait - System.currentTimeMillis()) / 1000;
+                LOG.info("Waiting For Consumer Completion. Time left: " + secs 
+ " secs");
+                Thread.sleep(10000);
+            } catch (Exception e) {
+            }
+        }
+
+        LOG.info("\nConsumer Complete: " + consumer.completed() +", Shutting 
Down.");
+
+        consumer.shutdown();
+
+        LOG.info("Consumer Stats:");
+
+        for (Map.Entry<String, List<String>> entry : 
consumer.getIDs().entrySet()) {
+
+            List<String> idList = entry.getValue();
+
+            int distinctConsumed = new TreeSet<String>(idList).size();
+
+            StringBuilder sb = new StringBuilder();
+            sb.append("   Queue: " + entry.getKey() +
+                    " -> Total Messages Consumed: " + idList.size() +
+                    ", Distinct IDs Consumed: " + distinctConsumed);
+
+            int diff = distinctPublishedCount - distinctConsumed;
+            sb.append(" ( " + (diff > 0 ? diff : "NO") + " STUCK MESSAGES " + 
" ) ");
+            LOG.info(sb.toString());
+
+            assertEquals("expect to get all messages!", 0, diff);
+
+        }
+    }
+
+    public class ExportQueuePublisher {
+
+        private final String amqUser = ActiveMQConnection.DEFAULT_USER;
+        private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
+        private ActiveMQConnectionFactory connectionFactory = null;
+        private String activemqURL = null;
+        private String activemqQueues = null;
+        // Collection of distinct IDs that the publisher has published.
+        // After a message is published, its UUID will be written to this list 
for tracking.
+        // This list of IDs (or distinct count) will be used to compare to the 
consumed list of IDs.
+        //private Set<String> ids = Collections.synchronizedSet(new 
TreeSet<String>());
+        private List<String> ids = Collections.synchronizedList(new 
ArrayList<String>());
+        private List<PublisherThread> threads;
+
+        public ExportQueuePublisher(String activemqURL, String activemqQueues, 
int messagesPerThread, int threadCount) throws Exception {
+
+            this.activemqURL = activemqURL;
+            this.activemqQueues = activemqQueues;
+
+            threads = new ArrayList<PublisherThread>();
+
+            // Build the threads and tell them how many messages to publish
+            for (int i = 0; i < threadCount; i++) {
+                PublisherThread pt = new PublisherThread(messagesPerThread);
+                threads.add(pt);
+            }
+        }
+
+        public List<String> getIDs() {
+            return ids;
+        }
+
+        // Kick off threads
+        public void start() throws Exception {
+
+            for (PublisherThread pt : threads) {
+                pt.start();
+            }
+        }
+
+        // Wait for threads to complete. They will complete once they've 
published all of their messages.
+        public void waitForCompletion() throws Exception {
+
+            for (PublisherThread pt : threads) {
+                pt.join();
+                pt.close();
+            }
+        }
+
+        private Session newSession(QueueConnection queueConnection) throws 
Exception {
+            return queueConnection.createSession(true, 
Session.SESSION_TRANSACTED);
+        }
+
+        private synchronized QueueConnection newQueueConnection() throws 
Exception {
+
+            if (connectionFactory == null) {
+                connectionFactory = new ActiveMQConnectionFactory(amqUser, 
amqPassword, activemqURL);
+            }
+
+            // Set the redelivery count to -1 (infinite), or else messages 
will start dropping
+            // after the queue has had a certain number of failures (default 
is 6)
+            RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
+            policy.setMaximumRedeliveries(-1);
+
+            QueueConnection amqConnection = 
connectionFactory.createQueueConnection();
+            amqConnection.start();
+            return amqConnection;
+        }
+
+        private class PublisherThread extends Thread {
+
+            private int count;
+            private QueueConnection qc;
+            private Session session;
+            private MessageProducer mp;
+
+            private PublisherThread(int count) throws Exception {
+
+                this.count = count;
+
+                // Each Thread has its own Connection and Session, so no sync 
worries
+                qc = newQueueConnection();
+                session = newSession(qc);
+
+                // In our code, when publishing to multiple queues,
+                // we're using composite destinations like below
+                Queue q = new ActiveMQQueue(activemqQueues);
+                mp = session.createProducer(q);
+            }
+
+            public void run() {
+
+                try {
+
+                    // Loop until we've published enough messages
+                    while (count-- > 0) {
+
+                        TextMessage tm = 
session.createTextMessage(getMessageText());
+                        String id = UUID.randomUUID().toString();
+                        tm.setStringProperty("KEY", id);
+                        ids.add(id);                            // keep track 
of the key to compare against consumer
+
+                        mp.send(tm);
+                        session.commit();
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+
+            // Called by waitForCompletion
+            public void close() {
+
+                try {
+                    mp.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    session.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    qc.close();
+                } catch (Exception e) {
+                }
+            }
+        }
+
+    }
+
+    String messageText;
+    private String getMessageText() {
+
+        if (messageText == null) {
+
+            synchronized (this) {
+
+                if (messageText == null) {
+
+                    StringBuilder sb = new StringBuilder();
+                    for (int i = 0; i < messageSize; i++) {
+                        sb.append("X");
+                    }
+                    messageText = sb.toString();
+                }
+            }
+        }
+
+        return messageText;
+    }
+
+
+    public class ExportQueueConsumer {
+
+        private final String amqUser = ActiveMQConnection.DEFAULT_USER;
+        private final String amqPassword = ActiveMQConnection.DEFAULT_PASSWORD;
+        private final int totalToExpect;
+        private ActiveMQConnectionFactory connectionFactory = null;
+        private String activemqURL = null;
+        private String activemqQueues = null;
+        private String[] queues = null;
+        // Map of IDs that were consumed, keyed by queue name.
+        // We'll compare these against what was published to know if any got 
stuck or dropped.
+        private Map<String, List<String>> idsByQueue = new HashMap<String, 
List<String>>();
+        private Map<String, List<ConsumerThread>> threads;
+
+        public ExportQueueConsumer(String activemqURL, String activemqQueues, 
int threadsPerQueue, int batchSize, int totalToExpect) throws Exception {
+
+            this.activemqURL = activemqURL;
+            this.activemqQueues = activemqQueues;
+            this.totalToExpect = totalToExpect;
+
+            queues = this.activemqQueues.split(",");
+
+            for (int i = 0; i < queues.length; i++) {
+                queues[i] = queues[i].trim();
+            }
+
+            threads = new HashMap<String, List<ConsumerThread>>();
+
+            // For each queue, create a list of threads and set up the list of 
ids
+            for (String q : queues) {
+
+                List<ConsumerThread> list = new ArrayList<ConsumerThread>();
+
+                idsByQueue.put(q, Collections.synchronizedList(new 
ArrayList<String>()));
+
+                for (int i = 0; i < threadsPerQueue; i++) {
+                    list.add(new ConsumerThread(q, batchSize));
+                }
+
+                threads.put(q, list);
+            }
+        }
+
+        public Map<String, List<String>> getIDs() {
+            return idsByQueue;
+        }
+
+        // Start the threads
+        public void start() throws Exception {
+
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    ct.start();
+                }
+            }
+        }
+
+        // Tell the threads to stop
+        // Then wait for them to stop
+        public void shutdown() throws Exception {
+
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    ct.shutdown();
+                }
+            }
+
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    ct.join();
+                }
+            }
+        }
+
+        private Session newSession(QueueConnection queueConnection) throws 
Exception {
+            return queueConnection.createSession(true, 
Session.SESSION_TRANSACTED);
+        }
+
+        private synchronized QueueConnection newQueueConnection() throws 
Exception {
+
+            if (connectionFactory == null) {
+                connectionFactory = new ActiveMQConnectionFactory(amqUser, 
amqPassword, activemqURL);
+            }
+
+            // Set the redelivery count to -1 (infinite), or else messages 
will start dropping
+            // after the queue has had a certain number of failures (default 
is 6)
+            RedeliveryPolicy policy = connectionFactory.getRedeliveryPolicy();
+            policy.setMaximumRedeliveries(-1);
+
+            QueueConnection amqConnection = 
connectionFactory.createQueueConnection();
+            amqConnection.start();
+            return amqConnection;
+        }
+
+        public boolean completed() {
+            for (List<ConsumerThread> list : threads.values()) {
+
+                for (ConsumerThread ct : list) {
+
+                    if (ct.isAlive()) {
+                        LOG.info("thread for {} is still alive.", ct.qName);
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+
+        private class ConsumerThread extends Thread {
+
+            private int batchSize;
+            private QueueConnection qc;
+            private Session session;
+            private MessageConsumer mc;
+            private List<String> idList;
+            private boolean shutdown = false;
+            private String qName;
+
+            private ConsumerThread(String queueName, int batchSize) throws 
Exception {
+
+                this.batchSize = batchSize;
+
+                // Each thread has its own connection and session
+                qName = queueName;
+                qc = newQueueConnection();
+                session = newSession(qc);
+                Queue q = session.createQueue(queueName + 
"?consumer.prefetchSize=" + batchSize);
+                mc = session.createConsumer(q);
+
+                idList = idsByQueue.get(queueName);
+            }
+
+            public void run() {
+
+                try {
+
+                    int count = 0;
+
+                    // Keep reading as long as it hasn't been told to shutdown
+                    while (!shutdown) {
+
+                        if (idList.size() >= totalToExpect) {
+                            LOG.info("Got {} for q: {}", +idList.size(), 
qName);
+                            session.commit();
+                            break;
+                        }
+                        Message m = mc.receive(4000);
+
+                        if (m != null) {
+
+                            // We received a non-null message, add the ID to 
our list
+
+                            idList.add(m.getStringProperty("KEY"));
+
+                            count++;
+
+                            // If we've reached our batch size, commit the 
batch and reset the count
+
+                            if (count == batchSize) {
+                                session.commit();
+                                count = 0;
+                            }
+                        } else {
+
+                            // We didn't receive anything this time, commit 
any current batch and reset the count
+
+                            session.commit();
+                            count = 0;
+
+                            // Sleep a little before trying to read after not 
getting a message
+
+                            try {
+                                if (idList.size() < totalToExpect) {
+                                    LOG.info("did not receive on {}, current 
count: {}", qName, idList.size());
+                                }
+                                //sleep(3000);
+                            } catch (Exception e) {
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+
+                    // Once we exit, close everything
+                    close();
+                }
+            }
+
+            public void shutdown() {
+                shutdown = true;
+            }
+
+            public void close() {
+
+                try {
+                    mc.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    session.close();
+                } catch (Exception e) {
+                }
+
+                try {
+                    qc.close();
+                } catch (Exception e) {
+
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
new file mode 100644
index 0000000..4ba6526
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5274Test.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AMQ5274Test {
+    static Logger LOG = LoggerFactory.getLogger(AMQ5274Test.class);
+    String activemqURL;
+    BrokerService brokerService;
+    ActiveMQQueue dest = new ActiveMQQueue("TestQ");
+
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setExpireMessagesPeriod(1000);
+        policyMap.setDefaultEntry(defaultPolicy);
+        brokerService.setDestinationPolicy(policyMap);
+        activemqURL = 
brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
+        brokerService.start();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    @Test
+    public void test() throws Exception {
+        LOG.info("Starting Test");
+        assertTrue(brokerService.isStarted());
+
+        produce();
+        consumeAndRollback();
+
+        // check reported queue size using JMX
+        long queueSize = getQueueSize();
+        assertEquals("Queue " + dest.getPhysicalName() + " not empty, 
reporting " + queueSize + " messages.", 0, queueSize);
+    }
+
+    private void consumeAndRollback() throws JMSException, 
InterruptedException {
+        ActiveMQConnection connection = createConnection();
+        RedeliveryPolicy noRedelivery = new RedeliveryPolicy();
+        noRedelivery.setMaximumRedeliveries(0);
+        connection.setRedeliveryPolicy(noRedelivery);
+        connection.start();
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        MessageConsumer consumer = session.createConsumer(dest);
+        Message m;
+        while ( (m = consumer.receive(4000)) != null) {
+            LOG.info("Got:" + m);
+            TimeUnit.SECONDS.sleep(1);
+            session.rollback();
+        }
+        connection.close();
+    }
+
+    private void produce() throws Exception {
+        Connection connection = createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(dest);
+        producer.setTimeToLive(10000);
+        for (int i=0;i<20;i++) {
+            producer.send(session.createTextMessage("i="+i));
+        }
+       connection.close();
+    }
+
+    private ActiveMQConnection createConnection() throws JMSException {
+        return (ActiveMQConnection) new 
ActiveMQConnectionFactory(activemqURL).createConnection();
+    }
+
+
+    public long getQueueSize() throws Exception {
+        long queueSize = 0;
+        try {
+            QueueViewMBean queueViewMBean = (QueueViewMBean) 
brokerService.getManagementContext().newProxyInstance(BrokerMBeanSupport.createDestinationName(brokerService.getBrokerObjectName(),
 dest), QueueViewMBean.class, false);
+            queueSize = queueViewMBean.getQueueSize();
+            LOG.info("QueueSize for destination {} is {}", dest, queueSize);
+        } catch (Exception ex) {
+           LOG.error("Error retrieving QueueSize from JMX ", ex);
+           throw ex;
+        }
+        return queueSize;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
new file mode 100644
index 0000000..ff10b0d
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class AMQ5381Test {
+
+    public static final byte[] ORIG_MSG_CONTENT = randomByteArray();
+    public static final String AMQ5381_EXCEPTION_MESSAGE = 
"java.util.zip.DataFormatException: incorrect header check";
+
+    private BrokerService brokerService;
+    private String brokerURI;
+
+    @Rule public TestName name = new TestName();
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        brokerURI = 
brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    private ActiveMQConnection createConnection(boolean useCompression) throws 
Exception {
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(brokerURI);
+        factory.setUseCompression(useCompression);
+        Connection connection = factory.createConnection();
+        connection.start();
+        return (ActiveMQConnection) connection;
+    }
+
+    @Test
+    public void amq5381Test() throws Exception {
+
+        // Consumer Configured for (useCompression=true)
+        final ActiveMQConnection consumerConnection = createConnection(true);
+        final Session consumerSession = 
consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue consumerQueue = 
consumerSession.createQueue(name.getMethodName());
+        final MessageConsumer consumer = 
consumerSession.createConsumer(consumerQueue);
+
+        // Producer Configured for (useCompression=false)
+        final ActiveMQConnection producerConnection = createConnection(false);
+        final Session producerSession = 
producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue producerQueue = 
producerSession.createQueue(name.getMethodName());
+
+        try {
+
+            final ActiveMQBytesMessage messageProduced = 
(ActiveMQBytesMessage) producerSession.createBytesMessage();
+            messageProduced.writeBytes(ORIG_MSG_CONTENT);
+            Assert.assertFalse(messageProduced.isReadOnlyBody());
+
+            Assert.assertFalse(
+                "Produced Message's 'compressed' flag should remain false 
until the message is sent (where it will be compressed, if necessary)",
+                messageProduced.isCompressed());
+
+            final MessageProducer producer = 
producerSession.createProducer(null);
+            producer.send(producerQueue, messageProduced);
+
+            Assert.assertEquals("Once sent, the produced Message's 
'compressed' flag should match its Connection's 'useCompression' flag",
+                producerConnection.isUseCompression(), 
messageProduced.isCompressed());
+
+            final ActiveMQBytesMessage messageConsumed = 
(ActiveMQBytesMessage) consumer.receive();
+            Assert.assertNotNull(messageConsumed);
+            Assert.assertTrue("Consumed Message should be read-only", 
messageConsumed.isReadOnlyBody());
+            Assert.assertEquals("Consumed Message's 'compressed' flag should 
match the produced Message's 'compressed' flag",
+                                messageProduced.isCompressed(), 
messageConsumed.isCompressed());
+
+            // ensure consumed message content matches what was originally set
+            final byte[] consumedMsgContent = new byte[(int) 
messageConsumed.getBodyLength()];
+            messageConsumed.readBytes(consumedMsgContent);
+
+            Assert.assertTrue("Consumed Message content should match the 
original Message content", Arrays.equals(ORIG_MSG_CONTENT, consumedMsgContent));
+
+            // make message writable so the consumer can modify and reuse it
+            makeWritable(messageConsumed);
+
+            // modify message, attempt to trigger DataFormatException due
+            // to old incorrect compression logic
+            try {
+                messageConsumed.setStringProperty(this.getClass().getName(), 
"test");
+            } catch (JMSException jmsE) {
+                if (AMQ5381_EXCEPTION_MESSAGE.equals(jmsE.getMessage())) {
+                    StringWriter sw = new StringWriter();
+                    PrintWriter pw = new PrintWriter(sw);
+                    jmsE.printStackTrace(pw);
+
+                    Assert.fail("AMQ5381 Error State Achieved: attempted to 
decompress BytesMessage contents that are not compressed\n" + sw.toString());
+                } else {
+                    throw jmsE;
+                }
+            }
+
+            Assert.assertEquals(
+                "The consumed Message's 'compressed' flag should still match 
the produced Message's 'compressed' flag after it has been made writable",
+                messageProduced.isCompressed(), 
messageConsumed.isCompressed());
+
+            // simulate re-publishing message
+            simulatePublish(messageConsumed);
+
+            // ensure consumed message content matches what was originally set
+            final byte[] modifiedMsgContent = new byte[(int) 
messageConsumed.getBodyLength()];
+            messageConsumed.readBytes(modifiedMsgContent);
+
+            Assert.assertTrue(
+                "After the message properties are modified and it is 
re-published, its message content should still match the original message 
content",
+                Arrays.equals(ORIG_MSG_CONTENT, modifiedMsgContent));
+        } finally {
+            producerSession.close();
+            producerConnection.close();
+            consumerSession.close();
+            consumerConnection.close();
+        }
+    }
+
+    protected static final int MAX_RANDOM_BYTE_ARRAY_SIZE_KB = 128;
+
+    protected static byte[] randomByteArray() {
+        final Random random = new Random();
+        final byte[] byteArray = new 
byte[random.nextInt(MAX_RANDOM_BYTE_ARRAY_SIZE_KB * 1024)];
+        random.nextBytes(byteArray);
+
+        return byteArray;
+    }
+
+    protected static void makeWritable(final ActiveMQMessage message) {
+        message.setReadOnlyBody(false);
+        message.setReadOnlyProperties(false);
+    }
+
+    protected static void simulatePublish(final ActiveMQBytesMessage message) 
throws JMSException {
+        message.reset();
+        message.onSend();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
new file mode 100644
index 0000000..751d488
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5421Test.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ5421Test {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ5421Test.class);
+
+    private static final int DEST_COUNT = 1000;
+    private final Destination[] destination = new Destination[DEST_COUNT];
+    private final MessageProducer[] producer = new MessageProducer[DEST_COUNT];
+    private BrokerService brokerService;
+    private String connectionUri;
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory conFactory = new 
ActiveMQConnectionFactory(connectionUri);
+        conFactory.setWatchTopicAdvisories(false);
+        return conFactory;
+    }
+
+    protected AbortSlowAckConsumerStrategy createSlowConsumerStrategy() {
+        AbortSlowAckConsumerStrategy strategy = new 
AbortSlowAckConsumerStrategy();
+        strategy.setCheckPeriod(2000);
+        strategy.setMaxTimeSinceLastAck(5000);
+        strategy.setIgnoreIdleConsumers(false);
+
+        return strategy;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = BrokerFactory.createBroker(new 
URI("broker://()/localhost?persistent=false&useJmx=true"));
+        PolicyEntry policy = new PolicyEntry();
+
+        policy.setSlowConsumerStrategy(createSlowConsumerStrategy());
+        policy.setQueuePrefetch(10);
+        policy.setTopicPrefetch(10);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        brokerService.setDestinationPolicy(pMap);
+        brokerService.addConnector("tcp://0.0.0.0:0");
+        brokerService.start();
+
+        connectionUri = 
brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+    }
+
+    @Test
+    public void testManyTempDestinations() throws Exception {
+        Connection connection = createConnectionFactory().createConnection();
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        for (int i = 0; i < DEST_COUNT; i++) {
+            destination[i] = session.createTemporaryQueue();
+            LOG.debug("Created temp queue: [}", i);
+        }
+
+        for (int i = 0; i < DEST_COUNT; i++) {
+            producer[i] = session.createProducer(destination[i]);
+            LOG.debug("Created producer: {}", i);
+            TextMessage msg = session.createTextMessage(" testMessage " + i);
+            producer[i].send(msg);
+            LOG.debug("message sent: {}", i);
+            MessageConsumer consumer = session.createConsumer(destination[i]);
+            Message message = consumer.receive(1000);
+            Assert.assertTrue(message.equals(msg));
+        }
+
+        for (int i = 0; i < DEST_COUNT; i++) {
+            producer[i].close();
+        }
+
+        connection.close();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java
new file mode 100644
index 0000000..6a2dc52
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5450Test.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.*;
+
+public class AMQ5450Test {
+
+    static final Logger LOG = LoggerFactory.getLogger(AMQ5450Test.class);
+    private final static int maxFileLength = 1024*1024*32;
+
+    private final static String POSTFIX_DESTINATION_NAME = ".dlq";
+
+    private final static String DESTINATION_NAME = "test" + 
POSTFIX_DESTINATION_NAME;
+    private final static String DESTINATION_NAME_2 = "2.test" + 
POSTFIX_DESTINATION_NAME;
+    private final static String DESTINATION_NAME_3 = "3.2.test" + 
POSTFIX_DESTINATION_NAME;
+
+    private final static String[] DESTS = new String[] {DESTINATION_NAME, 
DESTINATION_NAME_2, DESTINATION_NAME_3, DESTINATION_NAME, DESTINATION_NAME};
+
+
+    BrokerService broker;
+    private HashMap<Object, PersistenceAdapter> adapters = new HashMap();
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    protected BrokerService createAndStartBroker(PersistenceAdapter 
persistenceAdapter) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setBrokerName("localhost");
+        broker.setPersistenceAdapter(persistenceAdapter);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+        broker.waitUntilStarted();
+        return broker;
+    }
+
+    @Test
+    public void testPostFixMatch() throws Exception {
+        doTestPostFixMatch(false);
+    }
+
+    @Test
+    public void testPostFixCompositeMatch() throws Exception {
+        doTestPostFixMatch(true);
+    }
+
+    private void doTestPostFixMatch(boolean useComposite) throws Exception {
+        prepareBrokerWithMultiStore(useComposite);
+
+        sendMessage(DESTINATION_NAME, "test 1");
+        sendMessage(DESTINATION_NAME_2, "test 1");
+        sendMessage(DESTINATION_NAME_3, "test 1");
+
+        assertNotNull(broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME)));
+        assertNotNull(broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME_2)));
+        assertNotNull(broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME_3)));
+
+        for (String dest: DESTS)  {
+            Destination destination2 = broker.getDestination(new 
ActiveMQQueue(dest));
+            assertNotNull(destination2);
+            assertEquals(1, destination2.getMessageStore().getMessageCount());
+        }
+
+        HashMap numDests = new HashMap();
+        for (PersistenceAdapter pa : adapters.values()) {
+            numDests.put(pa.getDestinations().size(), pa);
+        }
+
+        // ensure wildcard does not match any
+        assertTrue("0 in wildcard matcher", 
adapters.get(null).getDestinations().isEmpty());
+
+        assertEquals("only two values", 2, numDests.size());
+        assertTrue("0 in others", numDests.containsKey(0));
+
+        if (useComposite) {
+            assertTrue("3 in one", numDests.containsKey(3));
+        } else {
+            assertTrue("1 in some", numDests.containsKey(1));
+        }
+
+    }
+
+    protected KahaDBPersistenceAdapter createStore(boolean delete) throws 
IOException {
+        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
+        kaha.setJournalMaxFileLength(maxFileLength);
+        kaha.setCleanupInterval(5000);
+        if (delete) {
+            kaha.deleteAllMessages();
+        }
+        return kaha;
+    }
+
+    public void prepareBrokerWithMultiStore(boolean compositeMatch) throws 
Exception {
+
+        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new 
MultiKahaDBPersistenceAdapter();
+        multiKahaDBPersistenceAdapter.deleteAllMessages();
+        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new 
ArrayList<FilteredKahaDBPersistenceAdapter>();
+
+        if (compositeMatch) {
+            StringBuffer compositeDestBuf = new StringBuffer();
+            for (int i=1; i<=DESTS.length;i++) {
+                for (int j=0;j<i;j++) {
+                    compositeDestBuf.append("*");
+                    if ((j+1 == i)) {
+                        compositeDestBuf.append(POSTFIX_DESTINATION_NAME);
+                    } else {
+                        compositeDestBuf.append(".");
+                    }
+                }
+                if (! (i+1 > DESTS.length)) {
+                    compositeDestBuf.append(",");
+                }
+            }
+            
adapters.add(createFilteredKahaDBByDestinationPrefix(compositeDestBuf.toString(),
 true));
+
+        } else {
+            // destination map does not do post fix wild card matches on 
paths, so we need to cover
+            // each path length
+            adapters.add(createFilteredKahaDBByDestinationPrefix("*" + 
POSTFIX_DESTINATION_NAME, true));
+            adapters.add(createFilteredKahaDBByDestinationPrefix("*.*" + 
POSTFIX_DESTINATION_NAME, true));
+            adapters.add(createFilteredKahaDBByDestinationPrefix("*.*.*" + 
POSTFIX_DESTINATION_NAME, true));
+            adapters.add(createFilteredKahaDBByDestinationPrefix("*.*.*.*" + 
POSTFIX_DESTINATION_NAME, true));
+        }
+
+        // ensure wildcard matcher is there for other dests
+        adapters.add(createFilteredKahaDBByDestinationPrefix(null, true));
+
+        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+        broker = createAndStartBroker(multiKahaDBPersistenceAdapter);
+    }
+
+       private FilteredKahaDBPersistenceAdapter 
createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean 
deleteAllMessages)
+                       throws IOException {
+               FilteredKahaDBPersistenceAdapter template = new 
FilteredKahaDBPersistenceAdapter();
+        template.setPersistenceAdapter(createStore(deleteAllMessages));
+        if (destinationPrefix != null) {
+            template.setQueue(destinationPrefix);
+        }
+        adapters.put(destinationPrefix, template.getPersistenceAdapter());
+               return template;
+       }
+
+       private void sendMessage(String destinationName, String message) throws 
JMSException {
+        ActiveMQConnectionFactory f = new 
ActiveMQConnectionFactory("vm://localhost");
+        f.setAlwaysSyncSend(true);
+        Connection c = f.createConnection();
+        c.start();
+        Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = s.createProducer(new 
ActiveMQQueue(destinationName));
+        producer.send(s.createTextMessage(message));
+        producer.close();
+        s.close();
+        c.stop();
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
new file mode 100644
index 0000000..a8739ae
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5567Test.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.concurrent.TimeUnit;
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import junit.framework.Test;
+import org.apache.activemq.broker.BrokerRestartTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ5567Test extends BrokerRestartTestSupport {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(AMQ5567Test.class);
+    ActiveMQQueue destination = new ActiveMQQueue("Q");
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        super.configureBroker(broker);
+        broker.setPersistenceAdapter(persistenceAdapter);
+    }
+
+    protected PolicyEntry getDefaultPolicy() {
+        PolicyEntry policy = new PolicyEntry();
+        policy.setMemoryLimit(60*1024);
+        return policy;
+    }
+
+    public void initCombosForTestPreparedTransactionNotDispatched() throws 
Exception {
+        PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{
+                new KahaDBPersistenceAdapter(),
+                new LevelDBPersistenceAdapter(),
+                new 
JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()),
 new OpenWireFormat())
+        };
+        for (PersistenceAdapter adapter : persistenceAdapters) {
+            adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory()));
+        }
+        addCombinationValues("persistenceAdapter", persistenceAdapters);
+    }
+
+    public void testPreparedTransactionNotDispatched() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("Q");
+
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+        Message message = createMessage(producerInfo, destination);
+        message.setPersistent(true);
+        message.setTransactionId(txid);
+        connection.send(message);
+
+        connection.send(createPrepareTransaction(connectionInfo, txid));
+
+
+        // send another non tx, will poke dispatch
+        message = createMessage(producerInfo, destination);
+        message.setPersistent(true);
+        connection.send(message);
+
+
+        // Since prepared but not committed.. only one should get delivered
+        StubConnection connectionC = createConnection();
+        ConnectionInfo connectionInfoC = createConnectionInfo();
+        SessionInfo sessionInfoC = createSessionInfo(connectionInfoC);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, 
destination);
+        connectionC.send(connectionInfoC);
+        connectionC.send(sessionInfoC);
+        connectionC.send(consumerInfo);
+
+        Message m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
+        LOG.info("received: " + m);
+        assertNotNull("Got message", m);
+        assertNull("Got non tx message", m.getTransactionId());
+
+        // cannot get the prepared message till commit
+        assertNull(receiveMessage(connectionC));
+        assertNoMessagesLeft(connectionC);
+
+        LOG.info("commit: " + txid);
+        connection.request(createCommitTransaction2Phase(connectionInfo, 
txid));
+
+        m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
+        LOG.info("received: " + m);
+        assertNotNull("Got non null message", m);
+
+    }
+
+    public void initCombosForTestCursorStoreSync() throws Exception {
+        PersistenceAdapter[] persistenceAdapters = new PersistenceAdapter[]{
+                new KahaDBPersistenceAdapter(),
+                new LevelDBPersistenceAdapter(),
+                new 
JDBCPersistenceAdapter(JDBCPersistenceAdapter.createDataSource(IOHelper.getDefaultDataDirectory()),
 new OpenWireFormat())
+        };
+        for (PersistenceAdapter adapter : persistenceAdapters) {
+            adapter.setDirectory(new File(IOHelper.getDefaultDataDirectory()));
+        }
+        addCombinationValues("persistenceAdapter", persistenceAdapters);
+    }
+
+    public void testCursorStoreSync() throws Exception {
+
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+        Message message = createMessage(producerInfo, destination);
+        message.setPersistent(true);
+        message.setTransactionId(txid);
+        connection.request(message);
+
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        assertTrue("cache is enabled", proxy.isCacheEnabled());
+
+        // send another non tx, will fill cursor
+        String payload = new String(new byte[10*1024]);
+        for (int i=0; i<6; i++) {
+            message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            ((TextMessage)message).setText(payload);
+            connection.request(message);
+        }
+
+        assertTrue("cache is disabled", !proxy.isCacheEnabled());
+
+        StubConnection connectionC = createConnection();
+        ConnectionInfo connectionInfoC = createConnectionInfo();
+        SessionInfo sessionInfoC = createSessionInfo(connectionInfoC);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfoC, 
destination);
+        connectionC.send(connectionInfoC);
+        connectionC.send(sessionInfoC);
+        connectionC.send(consumerInfo);
+
+        Message m = null;
+        for (int i=0; i<3; i++) {
+            m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
+            LOG.info("received: " + m);
+            assertNotNull("Got message", m);
+            assertNull("Got non tx message", m.getTransactionId());
+            connectionC.request(createAck(consumerInfo, m, 1, 
MessageAck.STANDARD_ACK_TYPE));
+        }
+
+        LOG.info("commit: " + txid);
+        connection.request(createCommitTransaction2Phase(connectionInfo, 
txid));
+        // consume the rest including the 2pc send in TX
+
+        for (int i=0; i<4; i++) {
+            m = receiveMessage(connectionC, TimeUnit.SECONDS.toMillis(10));
+            LOG.info("received[" + i + "] " + m);
+            assertNotNull("Got message", m);
+            if (i==3 ) {
+                assertNotNull("Got  tx message", m.getTransactionId());
+            } else {
+                assertNull("Got non tx message", m.getTransactionId());
+            }
+            connectionC.request(createAck(consumerInfo, m, 1, 
MessageAck.STANDARD_ACK_TYPE));
+        }
+    }
+
+    private QueueViewMBean getProxyToQueueViewMBean()
+            throws MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+                + ":destinationType=Queue,destinationName=" + 
destination.getQueueName()
+                + ",type=Broker,brokerName=localhost");
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName,
+                        QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    public static Test suite() {
+        return suite(AMQ5567Test.class);
+    }
+
+}

Reply via email to