http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
new file mode 100644
index 0000000..d5e5835
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4222Test.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.transport.vm.VMTransportFactory;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author <a href="http://www.christianposta.com/blog";>Christian Posta</a>
+ */
+public class AMQ4222Test extends TestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ4222Test.class);
+
+    protected BrokerService brokerService;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        topic = false;
+        brokerService = createBroker();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = BrokerFactory.createBroker(new 
URI("broker:()/localhost?persistent=false"));
+        broker.start();
+        broker.waitUntilStarted();
+        return broker;
+    }
+
+    @Override
+    protected ActiveMQConnectionFactory createConnectionFactory() throws 
Exception {
+        return new ActiveMQConnectionFactory("vm://localhost");
+    }
+
+    public void testTempQueueCleanedUp() throws Exception {
+
+        Destination requestQueue = createDestination();
+
+        Connection producerConnection = createConnection();
+        producerConnection.start();
+        Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        MessageProducer producer = 
producerSession.createProducer(requestQueue);
+        Destination replyTo = producerSession.createTemporaryQueue();
+        MessageConsumer producerSessionConsumer = 
producerSession.createConsumer(replyTo);
+
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        // let's listen to the response on the queue
+        producerSessionConsumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    if (message instanceof TextMessage) {
+                        LOG.info("You got a message: " + ((TextMessage) 
message).getText());
+                        countDownLatch.countDown();
+                    }
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        producer.send(createRequest(producerSession, replyTo));
+
+        Connection consumerConnection = createConnection();
+        consumerConnection.start();
+        Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = 
consumerSession.createConsumer(requestQueue);
+        final MessageProducer consumerProducer = 
consumerSession.createProducer(null);
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    consumerProducer.send(message.getJMSReplyTo(), message);
+                } catch (JMSException e) {
+                    LOG.error("error sending a response on the temp queue");
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        countDownLatch.await(2, TimeUnit.SECONDS);
+
+        // producer has not gone away yet...
+        org.apache.activemq.broker.region.Destination tempDestination = 
getDestination(brokerService,
+                (ActiveMQDestination) replyTo);
+        assertNotNull(tempDestination);
+
+        // clean up
+        producer.close();
+        producerSession.close();
+        producerConnection.close();
+
+        // producer has gone away.. so the temp queue should not exist 
anymore... let's see..
+        // producer has not gone away yet...
+        tempDestination = getDestination(brokerService,
+                (ActiveMQDestination) replyTo);
+        assertNull(tempDestination);
+
+        // now.. the connection on the broker side for the dude producing to 
the temp dest will
+        // still have a reference in his producerBrokerExchange.. this will 
keep the destination
+        // from being reclaimed by GC if there is never another send that 
producer makes...
+        // let's see if that reference is there...
+        final TransportConnector connector = 
VMTransportFactory.CONNECTORS.get("localhost");
+        assertNotNull(connector);
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return connector.getConnections().size() == 1;
+            }
+        }));
+        TransportConnection transportConnection = 
connector.getConnections().get(0);
+        Map<ProducerId, ProducerBrokerExchange> exchanges = 
getProducerExchangeFromConn(transportConnection);
+        assertEquals(1, exchanges.size());
+        ProducerBrokerExchange exchange = exchanges.values().iterator().next();
+
+        // so this is the reason for the test... we don't want these exchanges 
to hold a reference
+        // to a region destination.. after a send is completed, the 
destination is not used anymore on
+        // a producer exchange
+        assertNull(exchange.getRegionDestination());
+        assertNull(exchange.getRegion());
+
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<ProducerId, ProducerBrokerExchange> 
getProducerExchangeFromConn(TransportConnection transportConnection) throws 
NoSuchFieldException, IllegalAccessException {
+        Field f = 
TransportConnection.class.getDeclaredField("producerExchanges");
+        f.setAccessible(true);
+        Map<ProducerId, ProducerBrokerExchange> producerExchanges =
+                (Map<ProducerId, 
ProducerBrokerExchange>)f.get(transportConnection);
+        return producerExchanges;
+    }
+
+    private Message createRequest(Session session, Destination replyTo) throws 
JMSException {
+        Message message = session.createTextMessage("Payload");
+        message.setJMSReplyTo(replyTo);
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
new file mode 100644
index 0000000..8e6a96f
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4323Test.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.ConsumerThread;
+import org.apache.activemq.util.ProducerThread;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AMQ4323Test {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ4323Test.class);
+
+    BrokerService broker = null;
+    File kahaDbDir = null;
+    private final Destination destination = new ActiveMQQueue("q");
+    final String payload = new String(new byte[1024]);
+
+    protected void startBroker(boolean delete) throws Exception {
+        broker = new BrokerService();
+
+        //Start with a clean directory
+        kahaDbDir = new File(broker.getBrokerDataDirectory(), "KahaDB");
+        deleteDir(kahaDbDir);
+
+        broker.setSchedulerSupport(false);
+        broker.setDeleteAllMessagesOnStartup(delete);
+        broker.setPersistent(true);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:0");
+
+        PolicyMap map = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setUseCache(false);
+        map.setDefaultEntry(entry);
+        broker.setDestinationPolicy(map);
+
+        configurePersistence(broker, delete);
+
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    protected void configurePersistence(BrokerService brokerService, boolean 
deleteAllOnStart) throws Exception {
+        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) 
brokerService.getPersistenceAdapter();
+
+        // ensure there are a bunch of data files but multiple entries in each
+        adapter.setJournalMaxFileLength(1024 * 20);
+
+        // speed up the test case, checkpoint an cleanup early and often
+        adapter.setCheckpointInterval(500);
+        adapter.setCleanupInterval(500);
+
+        if (!deleteAllOnStart) {
+            adapter.setForceRecoverIndex(true);
+        }
+
+    }
+
+    private boolean deleteDir(File dir) {
+        if (dir.isDirectory()) {
+            String[] children = dir.list();
+            for (int i = 0; i < children.length; i++) {
+                boolean success = deleteDir(new File(dir, children[i]));
+                if (!success) {
+                    return false;
+                }
+            }
+        }
+
+        return dir.delete();
+    }
+
+    private int getFileCount(File dir){
+        if (dir.isDirectory()) {
+            String[] children = dir.list();
+            return children.length;
+        }
+
+        return 0;
+    }
+
+    @Test
+    public void testCleanupOfFiles() throws Exception {
+        final int messageCount = 500;
+        startBroker(true);
+        int fileCount = getFileCount(kahaDbDir);
+        assertEquals(4, fileCount);
+
+        Connection connection = new ActiveMQConnectionFactory(
+                
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+        connection.start();
+        Session producerSess = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Session consumerSess = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        ProducerThread producer = new ProducerThread(producerSess, 
destination) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                return sess.createTextMessage(payload + "::" + i);
+            }
+        };
+        producer.setMessageCount(messageCount);
+        ConsumerThread consumer = new ConsumerThread(consumerSess, 
destination);
+        consumer.setBreakOnNull(false);
+        consumer.setMessageCount(messageCount);
+
+        producer.start();
+        producer.join();
+
+        consumer.start();
+        consumer.join();
+
+        assertEquals("consumer got all produced messages", 
producer.getMessageCount(), consumer.getReceived());
+
+        // verify cleanup
+        assertTrue("gc worked", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                int fileCount = getFileCount(kahaDbDir);
+                LOG.info("current filecount:" + fileCount);
+                return 4 == fileCount;
+            }
+        }));
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
new file mode 100644
index 0000000..aa3ac2c
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4356Test.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4356Test {
+
+    private static BrokerService brokerService;
+    private static String BROKER_ADDRESS = "tcp://localhost:0";
+
+    private String connectionUri;
+    private ActiveMQConnectionFactory cf;
+    private final String CLIENT_ID = "AMQ4356Test";
+    private final String SUBSCRIPTION_NAME = "AMQ4356Test";
+
+    private void createBroker(boolean deleteOnStart) throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setUseJmx(true);
+        brokerService.setDeleteAllMessagesOnStartup(deleteOnStart);
+        connectionUri = 
brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+    }
+
+    private void startBroker() throws Exception {
+        createBroker(true);
+    }
+
+    private void restartBroker() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+        createBroker(false);
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        startBroker();
+        cf = new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    @Test
+    public void testVirtualTopicUnsubDurable() throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID(CLIENT_ID);
+        connection.start();
+
+        // create consumer 'cluster'
+        ActiveMQQueue queue1 = new 
ActiveMQQueue(getVirtualTopicConsumerName());
+        ActiveMQQueue queue2 = new 
ActiveMQQueue(getVirtualTopicConsumerName());
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer c1 = session.createConsumer(queue1);
+        c1.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+            }
+        });
+        MessageConsumer c2 = session.createConsumer(queue2);
+        c2.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+            }
+        });
+
+        ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName());
+        MessageConsumer c3 = session.createDurableSubscriber(topic, 
SUBSCRIPTION_NAME);
+
+        assertEquals(1, 
brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, 
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        c3.close();
+
+        // create topic producer
+        MessageProducer producer = session.createProducer(topic);
+        assertNotNull(producer);
+
+        int total = 10;
+        for (int i = 0; i < total; i++) {
+            producer.send(session.createTextMessage("message: " + i));
+        }
+
+        assertEquals(0, 
brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(1, 
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        session.unsubscribe(SUBSCRIPTION_NAME);
+        connection.close();
+
+        assertEquals(0, 
brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, 
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        restartBroker();
+
+        assertEquals(0, 
brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, 
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+    }
+
+    protected String getVirtualTopicName() {
+        return "VirtualTopic.TEST";
+    }
+
+    protected String getVirtualTopicConsumerName() {
+        return "Consumer.A.VirtualTopic.TEST";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java
new file mode 100644
index 0000000..0e1c465
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4361Test.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import 
org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
+import 
org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4361Test {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ4361Test.class);
+
+    private BrokerService service;
+    private String brokerUrlString;
+
+    @Before
+    public void setUp() throws Exception {
+        service = new BrokerService();
+        service.setDeleteAllMessagesOnStartup(true);
+        service.setUseJmx(false);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setMemoryLimit(1);
+        policy.setPendingSubscriberPolicy(new 
VMPendingSubscriberMessageStoragePolicy());
+        policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
+        policy.setProducerFlowControl(true);
+        policyMap.setDefaultEntry(policy);
+        service.setDestinationPolicy(policyMap);
+
+        service.setAdvisorySupport(false);
+        brokerUrlString = 
service.addConnector("tcp://localhost:0").getPublishableConnectString();
+        service.start();
+        service.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (service != null) {
+            service.stop();
+            service.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testCloseWhenHunk() throws Exception {
+
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(brokerUrlString);
+        connectionFactory.setProducerWindowSize(1024);
+
+        // TINY QUEUE is flow controlled after 1024 bytes
+        final ActiveMQDestination destination =
+            ActiveMQDestination.createDestination("queue://TINY_QUEUE", (byte) 
0xff);
+
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+        final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        final MessageProducer producer = session.createProducer(destination);
+        producer.setTimeToLive(0);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+        final AtomicReference<Exception> publishException = new 
AtomicReference<Exception>(null);
+        final AtomicReference<Exception> closeException = new 
AtomicReference<Exception>(null);
+        final AtomicLong lastLoop = new AtomicLong(System.currentTimeMillis() 
+ 100);
+
+        Thread pubThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    byte[] data = new byte[1000];
+                    new Random(0xdeadbeef).nextBytes(data);
+                    for (int i = 0; i < 10000; i++) {
+                        lastLoop.set(System.currentTimeMillis());
+                        ObjectMessage objMsg = session.createObjectMessage();
+                        objMsg.setObject(data);
+                        producer.send(destination, objMsg);
+                    }
+                } catch (Exception e) {
+                    publishException.set(e);
+                }
+            }
+        }, "PublishingThread");
+        pubThread.start();
+
+        // wait for publisher to deadlock
+        while (System.currentTimeMillis() - lastLoop.get() < 2000) {
+            Thread.sleep(100);
+        }
+        LOG.info("Publisher deadlock detected.");
+
+        Thread closeThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Attempting close..");
+                    producer.close();
+                } catch (Exception e) {
+                    closeException.set(e);
+                }
+            }
+        }, "ClosingThread");
+        closeThread.start();
+
+        try {
+            closeThread.join(30000);
+        } catch (InterruptedException ie) {
+            assertFalse("Closing thread didn't complete in 10 seconds", true);
+        }
+
+        try {
+            pubThread.join(30000);
+        } catch (InterruptedException ie) {
+            assertFalse("Publishing thread didn't complete in 10 seconds", 
true);
+        }
+
+        assertNull(closeException.get());
+        assertNotNull(publishException.get());
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
new file mode 100644
index 0000000..13e1cfb
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4368Test.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4368Test {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ4368Test.class);
+
+    private BrokerService broker;
+    private ActiveMQConnectionFactory connectionFactory;
+    private final Destination destination = new 
ActiveMQQueue("large_message_queue");
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = createBroker();
+        connectionUri = 
broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+        broker.start();
+        connectionFactory = new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setUseCache(false);
+        broker.setDestinationPolicy(new PolicyMap());
+        broker.getDestinationPolicy().setDefaultEntry(policy);
+
+        KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
+        kahadb.setCheckForCorruptJournalFiles(true);
+        kahadb.setCleanupInterval(1000);
+
+        kahadb.deleteAllMessages();
+        broker.setPersistenceAdapter(kahadb);
+        broker.getSystemUsage().getMemoryUsage().setLimit(1024*1024*100);
+        broker.setUseJmx(false);
+
+        return broker;
+    }
+
+    abstract class Client implements Runnable   {
+        private final String name;
+        final AtomicBoolean done = new AtomicBoolean();
+        CountDownLatch startedLatch;
+        CountDownLatch doneLatch = new CountDownLatch(1);
+        Connection connection;
+        Session session;
+        final AtomicLong size = new AtomicLong();
+
+        Client(String name, CountDownLatch startedLatch) {
+            this.name = name;
+            this.startedLatch = startedLatch;
+        }
+
+        public void start() {
+            LOG.info("Starting: " + name);
+            new Thread(this, name).start();
+        }
+
+        public void stopAsync() {
+            done.set(true);
+        }
+
+        public void stop() throws InterruptedException {
+            stopAsync();
+            if (!doneLatch.await(20, TimeUnit.MILLISECONDS)) {
+                try {
+                    connection.close();
+                    doneLatch.await();
+                } catch (Exception e) {
+                }
+            }
+        }
+
+        @Override
+        public void run() {
+            try {
+                connection = createConnection();
+                connection.start();
+                try {
+                    session = createSession();
+                    work();
+                } finally {
+                    try {
+                        connection.close();
+                    } catch (JMSException ignore) {
+                    }
+                    LOG.info("Stopped: " + name);
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                done.set(true);
+            } finally {
+                doneLatch.countDown();
+            }
+        }
+
+        protected Session createSession() throws JMSException {
+            return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+
+        protected Connection createConnection() throws JMSException {
+            return connectionFactory.createConnection();
+        }
+
+        abstract protected void work() throws Exception;
+    }
+
+    class ProducingClient extends Client {
+
+        ProducingClient(String name, CountDownLatch startedLatch) {
+            super(name, startedLatch);
+        }
+
+        private String createMessage() {
+            StringBuffer stringBuffer = new StringBuffer();
+            for (long i = 0; i < 1000000; i++) {
+                stringBuffer.append("1234567890");
+            }
+            return stringBuffer.toString();
+        }
+
+        @Override
+        protected void work() throws Exception {
+            String data = createMessage();
+            MessageProducer producer = session.createProducer(destination);
+            startedLatch.countDown();
+            while (!done.get()) {
+                producer.send(session.createTextMessage(data));
+                long i = size.incrementAndGet();
+                if ((i % 1000) == 0) {
+                    LOG.info("produced " + i + ".");
+                }
+            }
+        }
+    }
+
+    class ConsumingClient extends Client {
+        public ConsumingClient(String name, CountDownLatch startedLatch) {
+            super(name, startedLatch);
+        }
+
+        @Override
+        protected void work() throws Exception {
+            MessageConsumer consumer = session.createConsumer(destination);
+            startedLatch.countDown();
+            while (!done.get()) {
+                Message msg = consumer.receive(100);
+                if (msg != null) {
+                    size.incrementAndGet();
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testENTMQ220() throws Exception {
+        LOG.info("Start test.");
+        CountDownLatch producer1Started = new CountDownLatch(1);
+        CountDownLatch producer2Started = new CountDownLatch(1);
+        CountDownLatch listener1Started = new CountDownLatch(1);
+
+        final ProducingClient producer1 = new ProducingClient("1", 
producer1Started);
+        final ProducingClient producer2 = new ProducingClient("2", 
producer2Started);
+        final ConsumingClient listener1 = new ConsumingClient("subscriber-1", 
listener1Started);
+        final AtomicLong lastSize = new AtomicLong();
+
+        try {
+
+            producer1.start();
+            producer2.start();
+            listener1.start();
+
+            producer1Started.await(15, TimeUnit.SECONDS);
+            producer2Started.await(15, TimeUnit.SECONDS);
+            listener1Started.await(15, TimeUnit.SECONDS);
+
+            lastSize.set(listener1.size.get());
+            for (int i = 0; i < 10; i++) {
+                Wait.waitFor(new Wait.Condition() {
+
+                    @Override
+                    public boolean isSatisified() throws Exception {
+                        return listener1.size.get() > lastSize.get();
+                    }
+                });
+                long size = listener1.size.get();
+                LOG.info("Listener 1: consumed: " + (size - lastSize.get()));
+                assertTrue("No messages received on iteration: " + i, size > 
lastSize.get());
+                lastSize.set(size);
+            }
+        } finally {
+            LOG.info("Stopping clients");
+            producer1.stop();
+            producer2.stop();
+            listener1.stop();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java
new file mode 100644
index 0000000..73d6d69
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4407Test.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4407Test {
+
+    static final Logger LOG = LoggerFactory.getLogger(AMQ4407Test.class);
+    private final static int maxFileLength = 1024*1024*32;
+
+    private final static String PREFIX_DESTINATION_NAME = "queue";
+
+    private final static String DESTINATION_NAME = PREFIX_DESTINATION_NAME + 
".test";
+    private final static String DESTINATION_NAME_2 = PREFIX_DESTINATION_NAME + 
"2.test";
+    private final static String DESTINATION_NAME_3 = PREFIX_DESTINATION_NAME + 
"3.test";
+
+    BrokerService broker;
+
+    @Before
+    public void setUp() throws Exception {
+        prepareBrokerWithMultiStore(true);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    protected BrokerService createBroker(PersistenceAdapter kaha) throws 
Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(true);
+        broker.setBrokerName("localhost");
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+
+    @Test
+    public void testRestartAfterQueueDelete() throws Exception {
+
+        // Ensure we have an Admin View.
+        assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return (broker.getAdminView()) != null;
+            }
+        }));
+
+
+        LOG.info("Adding destinations: {}, {}, {}", new Object[] 
{DESTINATION_NAME, DESTINATION_NAME_3, DESTINATION_NAME_3});
+        sendMessage(DESTINATION_NAME, "test 1");
+        sendMessage(DESTINATION_NAME_2, "test 1");
+        sendMessage(DESTINATION_NAME_3, "test 1");
+
+        assertNotNull(broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME)));
+        assertNotNull(broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME_2)));
+        assertNotNull(broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME_3)));
+
+
+        LOG.info("Removing destination: {}", DESTINATION_NAME_2);
+        broker.getAdminView().removeQueue(DESTINATION_NAME_2);
+
+        LOG.info("Recreating destination: {}", DESTINATION_NAME_2);
+        sendMessage(DESTINATION_NAME_2, "test 1");
+
+        Destination destination2 = broker.getDestination(new 
ActiveMQQueue(DESTINATION_NAME_2));
+        assertNotNull(destination2);
+        assertEquals(1, destination2.getMessageStore().getMessageCount());
+    }
+
+    protected KahaDBPersistenceAdapter createStore(boolean delete) throws 
IOException {
+        KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
+        kaha.setJournalMaxFileLength(maxFileLength);
+        kaha.setCleanupInterval(5000);
+        if (delete) {
+            kaha.deleteAllMessages();
+        }
+        return kaha;
+    }
+
+    public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws 
Exception {
+
+        MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new 
MultiKahaDBPersistenceAdapter();
+        if (deleteAllMessages) {
+            multiKahaDBPersistenceAdapter.deleteAllMessages();
+        }
+        ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new 
ArrayList<FilteredKahaDBPersistenceAdapter>();
+
+        
adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME, 
deleteAllMessages));
+        
adapters.add(createFilteredKahaDBByDestinationPrefix(PREFIX_DESTINATION_NAME + 
"2", deleteAllMessages));
+        adapters.add(createFilteredKahaDBByDestinationPrefix(null, 
deleteAllMessages));
+
+        multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
+        broker = createBroker(multiKahaDBPersistenceAdapter);
+    }
+
+    /**
+     * Create filtered KahaDB adapter by destination prefix.
+     *
+     * @param destinationPrefix
+     * @param deleteAllMessages
+     * @return
+     * @throws IOException
+     */
+       private FilteredKahaDBPersistenceAdapter 
createFilteredKahaDBByDestinationPrefix(String destinationPrefix, boolean 
deleteAllMessages)
+                       throws IOException {
+               FilteredKahaDBPersistenceAdapter template = new 
FilteredKahaDBPersistenceAdapter();
+        template.setPersistenceAdapter(createStore(deleteAllMessages));
+        if (destinationPrefix != null) {
+               template.setQueue(destinationPrefix + ".>");
+        }
+               return template;
+       }
+
+
+       /**
+        * Send message to particular destination.
+        *
+        * @param destinationName
+        * @param message
+        * @throws JMSException
+        */
+       private void sendMessage(String destinationName, String message) throws 
JMSException {
+        ActiveMQConnectionFactory f = new 
ActiveMQConnectionFactory("vm://localhost");
+        f.setAlwaysSyncSend(true);
+        Connection c = f.createConnection();
+        c.start();
+        Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = s.createProducer(new 
ActiveMQQueue(destinationName));
+        producer.send(s.createTextMessage(message));
+        producer.close();
+        s.close();
+        c.stop();
+       }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java
new file mode 100644
index 0000000..ff973e9
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4413Test.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class AMQ4413Test {
+
+    static final Logger LOG = LoggerFactory.getLogger(AMQ4413Test.class);
+
+    final String brokerUrl = "tcp://localhost:0";
+    private String connectionUri;
+    final int numMsgsTriggeringReconnection = 2;
+    final int numMsgs = 30;
+    final int numTests = 75;
+    final ExecutorService threadPool = Executors.newCachedThreadPool();
+
+    @Test
+    public void testDurableSubMessageLoss() throws Exception{
+        // start embedded broker
+        BrokerService brokerService = new BrokerService();
+        connectionUri = 
brokerService.addConnector(brokerUrl).getPublishableConnectString();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.setKeepDurableSubsActive(true);
+        brokerService.setAdvisorySupport(false);
+        brokerService.start();
+        LOG.info("##### broker started");
+
+        // repeat test 50 times
+        try {
+            for (int i = 0; i < numTests; ++i) {
+                LOG.info("##### test " + i + " started");
+                test();
+            }
+
+            LOG.info("##### tests are done");
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOG.info("##### tests failed!");
+        } finally {
+            threadPool.shutdown();
+            brokerService.stop();
+            LOG.info("##### broker stopped");
+        }
+    }
+
+    private void test() throws Exception {
+
+        final String topicName = "topic-" + UUID.randomUUID();
+        final String clientId = "client-" + UUID.randomUUID();
+        final String subName = "sub-" + UUID.randomUUID();
+
+        // create (and only create) subscription first
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri);
+        factory.setWatchTopicAdvisories(false);
+        Connection connection = factory.createConnection();
+        connection.setClientID(clientId);
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(topicName);
+        TopicSubscriber durableSubscriptionCreator = 
session.createDurableSubscriber(topic, subName);
+
+        connection.stop();
+        durableSubscriptionCreator.close();
+        session.close();
+        connection.close();
+
+        // publisher task
+        Callable<Boolean> publisher = new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                Connection connection = null;
+
+                try {
+                    ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri);
+                    factory.setWatchTopicAdvisories(false);
+                    connection = factory.createConnection();
+                    Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                    Topic topic = session.createTopic(topicName);
+
+                    MessageProducer producer = session.createProducer(topic);
+                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+                    producer.setPriority(Message.DEFAULT_PRIORITY);
+                    producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
+
+                    for (int seq = 1; seq <= numMsgs; ++seq) {
+                        TextMessage msg = 
session.createTextMessage(String.valueOf(seq));
+                        producer.send(msg);
+                        LOG.info("pub sent msg: " + seq);
+                        Thread.sleep(1L);
+                    }
+
+                    LOG.info("pub is done");
+                } finally {
+                    if (connection != null) {
+                        try {
+                            connection.close();
+                        } catch (JMSException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+                return Boolean.TRUE;
+            }
+        };
+
+        // subscriber task
+        Callable<Boolean> durableSubscriber = new Callable<Boolean>() {
+            ActiveMQConnectionFactory factory;
+            Connection connection;
+            Session session;
+            Topic topic;
+            TopicSubscriber consumer;
+
+            @Override
+            public Boolean call() throws Exception {
+                factory = new ActiveMQConnectionFactory(connectionUri);
+                factory.setWatchTopicAdvisories(false);
+
+                try {
+                    connect();
+
+                    for (int seqExpected = 1; seqExpected <= numMsgs; 
++seqExpected) {
+                        TextMessage msg = (TextMessage) 
consumer.receive(3000L);
+                        if (msg == null) {
+                            LOG.info("expected: " + seqExpected + ", actual: 
timed out", msg);
+                            return Boolean.FALSE;
+                        }
+
+                        int seq = Integer.parseInt(msg.getText());
+
+                        LOG.info("sub received msg: " + seq);
+
+                        if (seqExpected != seq) {
+                            LOG.info("expected: " + seqExpected + ", actual: " 
+ seq);
+                            return Boolean.FALSE;
+                        }
+
+                        if (seq % numMsgsTriggeringReconnection == 0) {
+                            close(false);
+                            connect();
+
+                            LOG.info("sub reconnected");
+                        }
+                    }
+
+                    LOG.info("sub is done");
+                } finally {
+                    try {
+                        close(true);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+
+                return Boolean.TRUE;
+            }
+
+            void connect() throws Exception {
+                connection = factory.createConnection();
+                connection.setClientID(clientId);
+                connection.start();
+
+                session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                topic = session.createTopic(topicName);
+                consumer = session.createDurableSubscriber(topic, subName);
+            }
+
+            void close(boolean unsubscribe) throws Exception {
+                if (connection != null) {
+                    connection.stop();
+                }
+
+                if (consumer != null) {
+                    consumer.close();
+                }
+
+                if (session != null) {
+                    if (unsubscribe) {
+                        session.unsubscribe(subName);
+                    }
+                    session.close();
+                }
+
+                if (connection != null) {
+                    connection.close();
+                }
+            }
+        };
+
+        ArrayList<Future<Boolean>> results = new ArrayList<Future<Boolean>>();
+        results.add(threadPool.submit(publisher));
+        results.add(threadPool.submit(durableSubscriber));
+
+        for (Future<Boolean> result : results) {
+            assertTrue(result.get());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java
new file mode 100644
index 0000000..2f7ae69
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4469Test.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.jms.support.JmsUtils;
+
+public class AMQ4469Test {
+
+    private static final int maxConnections = 100;
+
+    private final ExecutorService executor = Executors.newCachedThreadPool();
+    private String connectionUri;
+    private BrokerService service;
+    private TransportConnector connector;
+
+    @Before
+    public void setUp() throws Exception {
+        service = new BrokerService();
+        service.setPersistent(false);
+        service.setUseJmx(false);
+        connector = 
service.addConnector("tcp://0.0.0.0:0?maximumConnections="+maxConnections);
+        connectionUri = connector.getPublishableConnectString();
+        service.start();
+        service.waitUntilStarted();
+    }
+
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(connectionUri);
+    }
+
+    @Test
+    public void testMaxConnectionControl() throws Exception {
+        final ConnectionFactory cf = createConnectionFactory();
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+        for(int i = 0; i < maxConnections + 20; i++) {
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    Connection conn = null;
+                    try {
+                        startupLatch.await();
+                        conn = cf.createConnection();
+                        conn.start();
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                        JmsUtils.closeConnection(conn);
+                    }
+                }
+            });
+        }
+
+        TcpTransportServer transportServer = 
(TcpTransportServer)connector.getServer();
+        // ensure the max connections is in effect
+        assertEquals(maxConnections, transportServer.getMaximumConnections());
+        // No connections at first
+        assertEquals(0, connector.getConnections().size());
+        // Release the latch to set up connections in parallel
+        startupLatch.countDown();
+        TimeUnit.SECONDS.sleep(5);
+
+        final TransportConnector connector = this.connector;
+
+        // Expect the max connections is created
+        assertTrue("Expected: " + maxConnections + " found: " + 
connector.getConnections().size(),
+            Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return connector.getConnections().size() == maxConnections;
+                }
+            })
+        );
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        executor.shutdown();
+
+        service.stop();
+        service.waitUntilStopped();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java
new file mode 100644
index 0000000..42c391c
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4472Test.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AMQ4472Test {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ4472Test.class);
+
+    @Test
+    public void testLostMessage() {
+        Connection connection = null;
+        try {
+            ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("vm://localhost?broker.useJmx=false");
+            connection = connectionFactory.createConnection();
+            connection.start();
+
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Destination test_data_destination = 
session.createQueue("test"+System.currentTimeMillis());
+
+            MessageConsumer consumer = 
session.createConsumer(test_data_destination);
+            LOG.info("Consumer 1 connected");
+
+            MessageProducer producer = 
session.createProducer(test_data_destination);
+            producer.send(session.createTextMessage("Message 1"));
+
+            // committing the session prior to the close
+            session.commit();
+
+            // starting a new transaction
+            producer.send(session.createTextMessage("Message 2"));
+
+            // in a new transaction, with prefetch>0, the message
+            // 1 will be pending till second commit
+            LOG.info("Closing consumer 1...");
+            consumer.close();
+
+            // create a consumer
+            consumer = session.createConsumer(test_data_destination);
+            LOG.info("Consumer 2 connected");
+
+            // retrieve message previously committed to tmp queue
+            Message message = consumer.receive(10000);
+            if (message != null) {
+                LOG.info("Got message 1:", message);
+                assertEquals("expected message", "Message 1", ((TextMessage) 
message).getText());
+                session.commit();
+            } else {
+                LOG.error("Expected message but it never arrived");
+            }
+            assertNotNull(message);
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            try {
+                connection.close();
+            } catch (JMSException e) {
+            }
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
new file mode 100644
index 0000000..3d11cef
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4475Test.java
@@ -0,0 +1,344 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertFalse;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.util.TimeStampingBrokerPlugin;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ4475Test {
+
+    private final Log LOG = LogFactory.getLog(AMQ4475Test.class);
+
+    private final int NUM_MSGS = 1000;
+    private final int MAX_THREADS = 20;
+
+    private BrokerService broker;
+    private String connectionUri;
+
+    private final ExecutorService executor = 
Executors.newFixedThreadPool(MAX_THREADS);
+    private final ActiveMQQueue original = new ActiveMQQueue("jms/AQueue");
+    private final ActiveMQQueue rerouted = new 
ActiveMQQueue("jms/AQueue_proxy");
+
+    @Before
+    public void setUp() throws Exception {
+        TimeStampingBrokerPlugin tsbp = new TimeStampingBrokerPlugin();
+        tsbp.setZeroExpirationOverride(432000000);
+        tsbp.setTtlCeiling(432000000);
+        tsbp.setFutureOnly(true);
+
+        broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(true);
+        broker.setPlugins(new BrokerPlugin[] {tsbp});
+        connectionUri = 
broker.addConnector("tcp://localhost:0").getPublishableConnectString();
+
+        // Configure Dead Letter Strategy
+        DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
+        strategy.setProcessExpired(true);
+        
((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
+        ((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
+        strategy.setProcessNonPersistent(true);
+
+        // Add policy and individual DLQ strategy
+        PolicyEntry policy = new PolicyEntry();
+        policy.setTimeBeforeDispatchStarts(3000);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void after() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testIndividualDeadLetterAndTimeStampPlugin() {
+        LOG.info("Starting test ..");
+
+        long startTime = System.nanoTime();
+
+        // Produce to network
+        List<Future<ProducerTask>> tasks = new 
ArrayList<Future<ProducerTask>>();
+
+        for (int index = 0; index < 1; index++) {
+            ProducerTask p = new ProducerTask(connectionUri, original, 
NUM_MSGS);
+            Future<ProducerTask> future = executor.submit(p, p);
+            tasks.add(future);
+        }
+
+        ForwardingConsumerThread f1 = new ForwardingConsumerThread(original, 
rerouted, NUM_MSGS);
+        f1.start();
+        ConsumerThread c1 = new ConsumerThread(connectionUri, rerouted, 
NUM_MSGS);
+        c1.start();
+
+        LOG.info("Waiting on consumers and producers to exit");
+
+        try {
+            for (Future<ProducerTask> future : tasks) {
+                ProducerTask e = future.get();
+                LOG.info("[Completed] " + e.dest.getPhysicalName());
+            }
+            executor.shutdown();
+            LOG.info("Producing threads complete, waiting on ACKs");
+            f1.join(TimeUnit.MINUTES.toMillis(2));
+            c1.join(TimeUnit.MINUTES.toMillis(2));
+        } catch (ExecutionException e) {
+            LOG.warn("Caught unexpected exception: {}", e);
+            throw new RuntimeException(e);
+        } catch (InterruptedException ie) {
+            LOG.warn("Caught unexpected exception: {}", ie);
+            throw new RuntimeException(ie);
+        }
+
+        assertFalse(f1.isFailed());
+        assertFalse(c1.isFailed());
+
+        long estimatedTime = System.nanoTime() - startTime;
+
+        LOG.info("Testcase duration (seconds): " + estimatedTime / 
1000000000.0);
+        LOG.info("Consumers and producers exited, all msgs received as 
expected");
+    }
+
+    public class ProducerTask implements Runnable {
+        private final String uri;
+        private final ActiveMQQueue dest;
+        private final int count;
+
+        public ProducerTask(String uri, ActiveMQQueue dest, int count) {
+            this.uri = uri;
+            this.dest = dest;
+            this.count = count;
+        }
+
+        @Override
+        public void run() {
+
+            Connection connection = null;
+            try {
+                String destName = "";
+
+                try {
+                    destName = dest.getQueueName();
+                } catch (JMSException e) {
+                    LOG.warn("Caught unexpected exception: {}", e);
+                }
+
+                ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(uri);
+
+                connection = connectionFactory.createConnection();
+
+                Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer = session.createProducer(dest);
+                connection.start();
+
+                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+                String msg = "Test Message";
+
+                for (int i = 0; i < count; i++) {
+                    producer.send(session.createTextMessage(msg + 
dest.getQueueName() + " " + i));
+                }
+
+                LOG.info("[" + destName + "] Sent " + count + " msgs");
+            } catch (Exception e) {
+                LOG.warn("Caught unexpected exception: {}", e);
+            } finally {
+                try {
+                    connection.close();
+                } catch (Throwable e) {
+                    LOG.warn("Caught unexpected exception: {}", e);
+                }
+            }
+        }
+    }
+
+    public class ForwardingConsumerThread extends Thread {
+
+        private final ActiveMQQueue original;
+        private final ActiveMQQueue forward;
+        private int blockSize = 0;
+        private final int PARALLEL = 1;
+        private boolean failed;
+
+        public ForwardingConsumerThread(ActiveMQQueue original, ActiveMQQueue 
forward, int total) {
+            this.original = original;
+            this.forward = forward;
+            this.blockSize = total / PARALLEL;
+        }
+
+        public boolean isFailed() {
+            return failed;
+        }
+
+        @Override
+        public void run() {
+            Connection connection = null;
+            try {
+
+                for (int index = 0; index < PARALLEL; index++) {
+
+                    ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost");
+
+                    connection = factory.createConnection();
+                    Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                    MessageConsumer consumer = 
session.createConsumer(original);
+                    MessageProducer producer = session.createProducer(forward);
+                    connection.start();
+                    int count = 0;
+
+                    while (count < blockSize) {
+
+                        Message msg1 = consumer.receive(10000);
+                        if (msg1 != null) {
+                            if (msg1 instanceof ActiveMQTextMessage) {
+                                if (count % 100 == 0) {
+                                    LOG.info("Consuming -> " + 
((ActiveMQTextMessage) msg1).getDestination() + " count=" + count);
+                                }
+
+                                producer.send(msg1);
+
+                                count++;
+                            } else {
+                                LOG.info("Skipping unknown msg type " + msg1);
+                            }
+                        } else {
+                            break;
+                        }
+                    }
+
+                    LOG.info("[" + original.getQueueName() + "] completed 
segment (" + index + " of " + blockSize + ")");
+                    connection.close();
+                }
+            } catch (Exception e) {
+                LOG.warn("Caught unexpected exception: {}", e);
+            } finally {
+                LOG.debug(getName() + ": is stopping");
+                try {
+                    connection.close();
+                } catch (Throwable e) {
+                }
+            }
+        }
+    }
+
+    public class ConsumerThread extends Thread {
+
+        private final String uri;
+        private final ActiveMQQueue dest;
+        private int blockSize = 0;
+        private final int PARALLEL = 1;
+        private boolean failed;
+
+        public ConsumerThread(String uri, ActiveMQQueue dest, int total) {
+            this.uri = uri;
+            this.dest = dest;
+            this.blockSize = total / PARALLEL;
+        }
+
+        public boolean isFailed() {
+            return failed;
+        }
+
+        @Override
+        public void run() {
+            Connection connection = null;
+            try {
+
+                for (int index = 0; index < PARALLEL; index++) {
+
+                    ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(uri);
+
+                    connection = factory.createConnection();
+                    Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                    MessageConsumer consumer = session.createConsumer(dest);
+                    connection.start();
+                    int count = 0;
+
+                    while (count < blockSize) {
+
+                        Object msg1 = consumer.receive(10000);
+                        if (msg1 != null) {
+                            if (msg1 instanceof ActiveMQTextMessage) {
+                                if (count % 100 == 0) {
+                                    LOG.info("Consuming -> " + 
((ActiveMQTextMessage) msg1).getDestination() + " count=" + count);
+                                }
+
+                                count++;
+                            } else {
+                                LOG.info("Skipping unknown msg type " + msg1);
+                            }
+                        } else {
+                            failed = true;
+                            break;
+                        }
+                    }
+
+                    LOG.info("[" + dest.getQueueName() + "] completed segment 
(" + index + " of " + blockSize + ")");
+                    connection.close();
+                }
+            } catch (Exception e) {
+                LOG.warn("Caught unexpected exception: {}", e);
+            } finally {
+                LOG.debug(getName() + ": is stopping");
+                try {
+                    connection.close();
+                } catch (Throwable e) {
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java
new file mode 100644
index 0000000..5a48540
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitLevelDBTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.leveldb.LevelDBStore;
+
+public class AMQ4485LowLimitLevelDBTest extends AMQ4485LowLimitTest {
+
+    public AMQ4485LowLimitLevelDBTest() {
+        super();
+        numBrokers = 2;
+    }
+
+    protected BrokerService createBroker(int brokerid, boolean addToNetwork) 
throws Exception {
+        BrokerService broker = super.createBroker(brokerid, addToNetwork);
+
+        LevelDBStore levelDBStore = new LevelDBStore();
+        levelDBStore.setDirectory(new 
File(broker.getBrokerDataDirectory(),"levelDB"));
+        broker.setPersistenceAdapter(levelDBStore);
+        return broker;
+    }
+}
\ No newline at end of file

Reply via email to