http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java
new file mode 100644
index 0000000..2ac6ff3
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2645Test.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ2645Test extends EmbeddedBrokerTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ2645Test.class);
+    private final static String QUEUE_NAME = "test.daroo.q";
+
+    public void testWaitForTransportInterruptionProcessingHang()
+            throws Exception {
+        final ConnectionFactory fac = new ActiveMQConnectionFactory(
+                "failover:(" + this.bindAddress + ")");
+        final Connection connection = fac.createConnection();
+        try {
+            final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            final Queue queue = session.createQueue(QUEUE_NAME);
+            final MessageProducer producer = session.createProducer(queue);
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+            connection.start();
+
+            producer.send(session.createTextMessage("test"));
+
+            final CountDownLatch afterRestart = new CountDownLatch(1);
+            final CountDownLatch twoNewMessages = new CountDownLatch(1);
+            final CountDownLatch thirdMessageReceived = new CountDownLatch(1);
+
+            final MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
+            consumer.setMessageListener(new MessageListener() {
+                public void onMessage(Message message) {
+                    try {
+                        afterRestart.await();
+
+                        final TextMessage txtMsg = (TextMessage) message;
+                        if (txtMsg.getText().equals("test")) {
+                            producer.send(session.createTextMessage("test 1"));
+                            TimeUnit.SECONDS.sleep(5);
+                            // THIS SECOND send() WILL CAUSE CONSUMER DEADLOCK
+                            producer.send(session.createTextMessage("test 2"));
+                            LOG.info("Two new messages produced.");
+                            twoNewMessages.countDown();
+                        } else if (txtMsg.getText().equals("test 3")) {
+                            thirdMessageReceived.countDown();
+                        }
+                    } catch (Exception e) {
+                        LOG.error(e.toString());
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+
+            LOG.info("Stopping broker....");
+            broker.stop();
+
+            LOG.info("Creating new broker...");
+            broker = createBroker();
+            startBroker();
+            broker.waitUntilStarted();
+
+            afterRestart.countDown();
+            assertTrue("Consumer is deadlocked!", twoNewMessages.await(60, 
TimeUnit.SECONDS));
+
+            producer.send(session.createTextMessage("test 3"));
+            assertTrue("Consumer got third message after block", 
thirdMessageReceived.await(60, TimeUnit.SECONDS));
+
+        } finally {
+            broker.stop();
+        }
+
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://0.0.0.0:61617";
+        super.setUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
new file mode 100644
index 0000000..e584572
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2736Test.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.DefaultIOExceptionHandler;
+import org.junit.After;
+import org.junit.Test;
+
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class AMQ2736Test {
+    BrokerService broker;
+
+    @Test
+    public void testRollbackOnRecover() throws Exception {
+        broker = createAndStartBroker(true);
+        DefaultIOExceptionHandler ignoreAllExceptionsIOExHandler = new 
DefaultIOExceptionHandler();
+        ignoreAllExceptionsIOExHandler.setIgnoreAllErrors(true);
+        broker.setIoExceptionHandler(ignoreAllExceptionsIOExHandler);
+
+        ActiveMQConnectionFactory f = new 
ActiveMQConnectionFactory("vm://localhost?async=false");
+        f.setAlwaysSyncSend(true);
+        Connection c = f.createConnection();
+        c.start();
+        Session s = c.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer p = s.createProducer(new ActiveMQQueue("Tx"));
+        p.send(s.createTextMessage("aa"));
+
+        // kill journal without commit
+        KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter();
+        KahaDBStore store = pa.getStore();
+
+        assertNotNull("last tx location is present " + 
store.getInProgressTxLocationRange()[1]);
+
+        // test hack, close the journal to ensure no further journal updates 
when broker stops
+        // mimic kill -9 in terms of no normal shutdown sequence
+        store.getJournal().close();
+        try {
+            store.close();
+        } catch (Exception expectedLotsAsJournalBorked) {
+        }
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        // restart with recovery
+        broker = createAndStartBroker(false);
+
+        pa = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        store = pa.getStore();
+
+        // inflight non xa tx should be rolledback on recovery
+        assertNull("in progress tx location is present ", 
store.getInProgressTxLocationRange()[0]);
+
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    private BrokerService createAndStartBroker(boolean deleteAll) throws 
Exception {
+        BrokerService broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(deleteAll);
+        broker.setUseJmx(false);
+        broker.getManagementContext().setCreateConnector(false);
+        broker.start();
+        return broker;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java
new file mode 100644
index 0000000..b52f5c0
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2751Test.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ2751Test extends EmbeddedBrokerTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ2751Test.class);
+
+    private static String clientIdPrefix = "consumer";
+    private static String queueName = "FOO";
+
+    public void testRecoverRedelivery() throws Exception {
+
+        final CountDownLatch redelivery = new CountDownLatch(6);
+        final ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(
+                "failover:(" + 
broker.getTransportConnectors().get(0).getConnectUri() + ")");
+        try {
+
+            Connection connection = factory.createConnection();
+            String clientId = clientIdPrefix;
+            connection.setClientID(clientId);
+
+            final Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+
+            Queue queue = session.createQueue(queueName);
+
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            consumer.setMessageListener(new MessageListener() {
+                public void onMessage(Message message) {
+                    try {
+                        LOG.info("Got message: " + message.getJMSMessageID());
+                        if (message.getJMSRedelivered()) {
+                            LOG.info("It's a redelivery.");
+                            redelivery.countDown();
+                        }
+                        LOG.info("calling recover() on the session to force 
redelivery.");
+                        session.recover();
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+
+            System.out.println("Created queue consumer with clientId " + 
clientId);
+            connection.start();
+
+            MessageProducer producer = session.createProducer(queue);
+            producer.send(session.createTextMessage("test"));
+
+            assertTrue("we got 6 redeliveries", redelivery.await(20, 
TimeUnit.SECONDS));
+
+        } finally {
+            broker.stop();
+        }
+
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:0";
+        super.setUp();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java
new file mode 100644
index 0000000..a1d0bc1
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2801Test.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import 
org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.usage.SystemUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ2801Test
+{
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ2801Test.class);
+
+    private static final String TOPICNAME = "InvalidPendingQueueTest";
+    private static final String SELECTOR1 = "JMS_ID" + " = '" + "TEST" + "'";
+    private static final String SELECTOR2 = "JMS_ID" + " = '" + "TEST2" + "'";
+    private static final String SUBSCRIPTION1 = "InvalidPendingQueueTest_1";
+    private static final String SUBSCRIPTION2 = "InvalidPendingQueueTest_2";
+    private static final int MSG_COUNT = 2500;
+    private Session session1;
+    private Connection conn1;
+    private Topic topic1;
+    private MessageConsumer consumer1;
+    private Session session2;
+    private Connection conn2;
+    private Topic topic2;
+    private MessageConsumer consumer2;
+    private BrokerService broker;
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        applyMemoryLimitPolicy(broker);
+        broker.start();
+
+        connectionUri = 
broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+    private void applyMemoryLimitPolicy(BrokerService broker) {
+        final SystemUsage memoryManager = new SystemUsage();
+        memoryManager.getMemoryUsage().setLimit(5818230784L);
+        memoryManager.getStoreUsage().setLimit(6442450944L);
+        memoryManager.getTempUsage().setLimit(3221225472L);
+        broker.setSystemUsage(memoryManager);
+
+        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+        final PolicyEntry entry = new PolicyEntry();
+        entry.setQueue(">");
+        entry.setProducerFlowControl(false);
+        entry.setMemoryLimit(504857608);
+        entry.setPendingQueuePolicy(new 
FilePendingQueueMessageStoragePolicy());
+        policyEntries.add(entry);
+
+        final PolicyMap policyMap = new PolicyMap();
+        policyMap.setPolicyEntries(policyEntries);
+        broker.setDestinationPolicy(policyMap);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        conn1.close();
+        conn2.close();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    private void produceMessages() throws Exception {
+        TopicConnection connection = createConnection();
+        TopicSession session = connection.createTopicSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(TOPICNAME);
+        TopicPublisher producer = session.createPublisher(topic);
+        connection.start();
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        long tStamp = System.currentTimeMillis();
+        BytesMessage message = session2.createBytesMessage();
+        for (int i = 1; i <= MSG_COUNT; i++)
+        {
+            message.setStringProperty("JMS_ID", "TEST");
+            message.setIntProperty("Type", i);
+            producer.publish(message);
+            if (i%100 == 0) {
+                LOG.info("sent: " + i + " @ " + ((System.currentTimeMillis() - 
tStamp) / 100)  + "m/ms");
+                tStamp = System.currentTimeMillis() ;
+            }
+        }
+    }
+
+    private void activeateSubscribers() throws Exception {
+        // First consumer
+        conn1 = createConnection();
+        conn1.setClientID(SUBSCRIPTION1);
+        session1 = conn1.createSession(true, Session.SESSION_TRANSACTED);
+        topic1 = session1.createTopic(TOPICNAME);
+        consumer1 = session1.createDurableSubscriber(topic1, SUBSCRIPTION1, 
SELECTOR1, false);
+        conn1.start();
+
+        // Second consumer that just exists
+        conn2 = createConnection();
+        conn2.setClientID(SUBSCRIPTION2);
+        session2 = conn2.createSession(true, Session.SESSION_TRANSACTED);
+        topic2 = session2.createTopic(TOPICNAME);
+        consumer2 = session2.createDurableSubscriber(topic2, SUBSCRIPTION2, 
SELECTOR2, false);
+        conn2.start();
+    }
+
+    @Test
+    public void testInvalidPendingQueue() throws Exception {
+
+        activeateSubscribers();
+
+        assertNotNull(consumer1);
+        assertNotNull(consumer2);
+
+        produceMessages();
+        LOG.debug("Sent messages to a single subscriber");
+        Thread.sleep(2000);
+
+        LOG.debug("Closing durable subscriber connections");
+        conn1.close();
+        conn2.close();
+        LOG.debug("Closed durable subscriber connections");
+
+        Thread.sleep(2000);
+        LOG.debug("Re-starting durable subscriber connections");
+
+        activeateSubscribers();
+        LOG.debug("Started up durable subscriber connections - now view 
activemq console to see pending queue size on the other subscriber");
+
+        ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
+
+        for (int i = 0; i < subs.length; i++) {
+            ObjectName subName = subs[i];
+            DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(subName, 
DurableSubscriptionViewMBean.class, true);
+
+            LOG.info(sub.getSubscriptionName() + ": pending = " + 
sub.getPendingQueueSize() + ", dispatched: " + sub.getDispatchedQueueSize());
+            if(sub.getSubscriptionName().equals(SUBSCRIPTION1)) {
+                assertEquals("Incorrect number of pending messages", 
MSG_COUNT, sub.getPendingQueueSize() + sub.getDispatchedQueueSize());
+            } else {
+                assertEquals("Incorrect number of pending messages", 0, 
sub.getPendingQueueSize());
+            }
+        }
+    }
+
+    private TopicConnection createConnection() throws Exception
+    {
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory();
+        connectionFactory.setBrokerURL(connectionUri);
+        TopicConnection conn = connectionFactory.createTopicConnection();
+        return conn;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
new file mode 100644
index 0000000..22ad6ab
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.leveldb.LevelDBStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ2832Test {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ2832Test.class);
+
+    BrokerService broker = null;
+    private ActiveMQConnectionFactory cf;
+    private final Destination destination = new ActiveMQQueue("AMQ2832Test");
+    private String connectionUri;
+
+    protected void startBroker() throws Exception {
+        doStartBroker(true, false);
+    }
+
+    protected void restartBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        doStartBroker(false, false);
+    }
+
+    protected void recoverBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        doStartBroker(false, true);
+    }
+
+    private void doStartBroker(boolean delete, boolean recover) throws 
Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(delete);
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.addConnector("tcp://localhost:0");
+
+        configurePersistence(broker, recover);
+
+        connectionUri = "vm://localhost?create=false";
+        cf = new ActiveMQConnectionFactory(connectionUri);
+
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    protected void configurePersistence(BrokerService brokerService, boolean 
recover) throws Exception {
+        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) 
brokerService.getPersistenceAdapter();
+
+        // ensure there are a bunch of data files but multiple entries in each
+        adapter.setJournalMaxFileLength(1024 * 20);
+
+        // speed up the test case, checkpoint an cleanup early and often
+        adapter.setCheckpointInterval(5000);
+        adapter.setCleanupInterval(5000);
+
+        if (recover) {
+            adapter.setForceRecoverIndex(true);
+        }
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+   /**
+    * Scenario:
+    * db-1.log has an unacknowledged message,
+    * db-2.log contains acks for the messages from db-1.log,
+    * db-3.log contains acks for the messages from db-2.log
+    *
+    * Expected behavior: since db-1.log is blocked, db-2.log and db-3.log 
should not be removed during the cleanup.
+    * Current situation on 5.10.0, 5.10.1 is that db-3.log is removed causing 
all messages from db-2.log, whose acks were in db-3.log, to be replayed.
+    *
+    * @throws Exception
+    */
+    @Test
+    public void testAckChain() throws Exception {
+       startBroker();
+
+       StagedConsumer consumer = new StagedConsumer();
+       // file #1
+       produceMessagesToConsumeMultipleDataFiles(5);
+       // acknowledge first 2 messages and leave the 3rd one unacknowledged 
blocking db-1.log
+       consumer.receive(3);
+
+       // send messages by consuming and acknowledging every message right 
after sent in order to get KahadbAdd and Remove command to be saved together
+       // this is necessary in order to get KahaAddMessageCommand to be saved 
in one db file and the corresponding KahaRemoveMessageCommand in the next one
+       produceAndConsumeImmediately(20, consumer);
+       consumer.receive(2).acknowledge(); // consume and ack the last 2 
unconsumed
+
+       // now we have 3 files written and started with #4
+       consumer.close();
+
+       broker.stop();
+       broker.waitUntilStopped();
+
+       recoverBroker();
+
+       consumer = new StagedConsumer();
+       Message message = consumer.receive(1);
+       assertNotNull("One message stays unacked from db-1.log", message);
+       message.acknowledge();
+       message = consumer.receive(1);
+       assertNull("There should not be any unconsumed messages any more", 
message);
+       consumer.close();
+   }
+
+   private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer 
consumer) throws Exception {
+      for (int i = 0; i < numOfMsgs; i++) {
+         produceMessagesToConsumeMultipleDataFiles(1);
+         consumer.receive(1).acknowledge();
+      }
+   }
+
+   @Test
+    public void testAckRemovedMessageReplayedAfterRecovery() throws Exception {
+
+        startBroker();
+
+        StagedConsumer consumer = new StagedConsumer();
+        int numMessagesAvailable = 
produceMessagesToConsumeMultipleDataFiles(20);
+        // this will block the reclaiming of one data file
+        Message firstUnacked = consumer.receive(10);
+        LOG.info("first unacked: " + firstUnacked.getJMSMessageID());
+        Message secondUnacked = consumer.receive(1);
+        LOG.info("second unacked: " + secondUnacked.getJMSMessageID());
+        numMessagesAvailable -= 11;
+
+        numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10);
+        // ensure ack is another data file
+        LOG.info("Acking firstUnacked: " + firstUnacked.getJMSMessageID());
+        firstUnacked.acknowledge();
+
+        numMessagesAvailable += produceMessagesToConsumeMultipleDataFiles(10);
+
+        consumer.receive(numMessagesAvailable).acknowledge();
+
+        // second unacked should keep first data file available but journal 
with the first ack
+        // may get whacked
+        consumer.close();
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        recoverBroker();
+
+        consumer = new StagedConsumer();
+        // need to force recovery?
+
+        Message msg = consumer.receive(1, 5);
+        assertNotNull("One messages left after recovery", msg);
+        msg.acknowledge();
+
+        // should be no more messages
+        msg = consumer.receive(1, 5);
+        assertEquals("Only one messages left after recovery: " + msg, null, 
msg);
+        consumer.close();
+    }
+
+    @Test
+    public void testAlternateLossScenario() throws Exception {
+
+        startBroker();
+        PersistenceAdapter pa  = broker.getPersistenceAdapter();
+        if (pa instanceof LevelDBStore) {
+            return;
+        }
+
+        ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
+        ActiveMQQueue disposable = new ActiveMQQueue("MyDisposableQueue");
+        ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");
+
+        // This ensure that data file 1 never goes away.
+        createInactiveDurableSub(topic);
+        assertEquals(1, getNumberOfJournalFiles());
+
+        // One Queue Message that will be acked in another data file.
+        produceMessages(queue, 1);
+        assertEquals(1, getNumberOfJournalFiles());
+
+        // Add some messages to consume space
+        produceMessages(disposable, 50);
+
+        int dataFilesCount = getNumberOfJournalFiles();
+        assertTrue(dataFilesCount > 1);
+
+        // Create an ack for the single message on this queue
+        drainQueue(queue);
+
+        // Add some more messages to consume space beyond tha data file with 
the ack
+        produceMessages(disposable, 50);
+
+        assertTrue(dataFilesCount < getNumberOfJournalFiles());
+        dataFilesCount = getNumberOfJournalFiles();
+
+        restartBroker();
+
+        // Clear out all queue data
+        broker.getAdminView().removeQueue(disposable.getQueueName());
+
+        // Once this becomes true our ack could be lost.
+        assertTrue("Less than three journal file expected, was " + 
getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getNumberOfJournalFiles() <= 3;
+            }
+        }, TimeUnit.MINUTES.toMillis(3)));
+
+        // Recover and the Message should not be replayed but if the old 
MessageAck is lost
+        // then it could be.
+        recoverBroker();
+
+        assertTrue(drainQueue(queue) == 0);
+    }
+
+    private int getNumberOfJournalFiles() throws IOException {
+
+        Collection<DataFile> files =
+            ((KahaDBPersistenceAdapter) 
broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
+        int reality = 0;
+        for (DataFile file : files) {
+            if (file != null) {
+                reality++;
+            }
+        }
+
+        return reality;
+    }
+
+    private void createInactiveDurableSub(Topic topic) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createDurableSubscriber(topic, 
"Inactive");
+        consumer.close();
+        connection.close();
+        produceMessages(topic, 1);
+    }
+
+    private int drainQueue(Queue queue) throws Exception {
+        Connection connection = cf.createConnection();
+        connection.setClientID("Inactive");
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(queue);
+        int count = 0;
+        while (consumer.receive(5000) != null) {
+            count++;
+        }
+        consumer.close();
+        connection.close();
+        return count;
+    }
+
+    private int produceMessages(Destination destination, int numToSend) throws 
Exception {
+        int sent = 0;
+        Connection connection = new ActiveMQConnectionFactory(
+                
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+        connection.start();
+        try {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(destination);
+            for (int i = 0; i < numToSend; i++) {
+                producer.send(createMessage(session, i));
+                sent++;
+            }
+        } finally {
+            connection.close();
+        }
+
+        return sent;
+    }
+
+    private int produceMessagesToConsumeMultipleDataFiles(int numToSend) 
throws Exception {
+        return produceMessages(destination, numToSend);
+    }
+
+    final String payload = new String(new byte[1024]);
+
+    private Message createMessage(Session session, int i) throws Exception {
+        return session.createTextMessage(payload + "::" + i);
+    }
+
+    private class StagedConsumer {
+        Connection connection;
+        MessageConsumer consumer;
+
+        StagedConsumer() throws Exception {
+            connection = new ActiveMQConnectionFactory("failover://" +
+                    
broker.getTransportConnectors().get(0).getConnectUri().toString()).createConnection();
+            connection.start();
+            consumer = connection.createSession(false, 
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE).createConsumer(destination);
+        }
+
+        public Message receive(int numToReceive) throws Exception {
+            return receive(numToReceive, 2);
+        }
+
+        public Message receive(int numToReceive, int timeoutInSeconds) throws 
Exception {
+            Message msg = null;
+            for (; numToReceive > 0; numToReceive--) {
+
+                do  {
+                    msg = consumer.receive(1*1000);
+                } while (msg == null && --timeoutInSeconds > 0);
+
+                if (numToReceive > 1) {
+                    msg.acknowledge();
+                }
+
+                if (msg != null) {
+                    LOG.debug("received: " + msg.getJMSMessageID());
+                }
+            }
+            // last message, unacked
+            return msg;
+        }
+
+        void close() throws JMSException {
+            consumer.close();
+            connection.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java
new file mode 100644
index 0000000..8bb6bff
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2870Test.java
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class AMQ2870Test extends org.apache.activemq.TestSupport  {
+
+    static final Logger LOG = LoggerFactory.getLogger(AMQ2870Test.class);
+    BrokerService broker = null;
+    ActiveMQTopic topic;
+
+    ActiveMQConnection consumerConnection = null, producerConnection = null;
+    Session producerSession;
+    MessageProducer producer;
+    final int minPercentUsageForStore = 10;
+    String data;
+
+    private final PersistenceAdapterChoice persistenceAdapterChoice;
+
+    @Parameterized.Parameters
+    public static Collection<PersistenceAdapterChoice[]> getTestParameters() {
+        String osName = System.getProperty("os.name");
+        LOG.info("Running on [" + osName + "]");
+        PersistenceAdapterChoice[] kahaDb = {PersistenceAdapterChoice.KahaDB};
+        PersistenceAdapterChoice[] levelDb = 
{PersistenceAdapterChoice.LevelDB};
+        List<PersistenceAdapterChoice[]> choices = new 
ArrayList<PersistenceAdapterChoice[]>();
+        choices.add(kahaDb);
+        if (!osName.equalsIgnoreCase("AIX") && 
!osName.equalsIgnoreCase("SunOS")) {
+            choices.add(levelDb);
+        }
+
+        return choices;
+    }
+
+    public AMQ2870Test(PersistenceAdapterChoice choice) {
+        this.persistenceAdapterChoice = choice;
+    }
+
+    @Test(timeout = 300000)
+    public void testSize() throws Exception {
+        openConsumer();
+
+        assertEquals(0, broker.getAdminView().getStorePercentUsage());
+
+        for (int i = 0; i < 5000; i++) {
+            sendMessage(false);
+        }
+
+        final BrokerView brokerView = broker.getAdminView();
+
+        // wait for reclaim
+        assertTrue("in range with consumer",
+                Wait.waitFor(new Wait.Condition() {
+                    @Override
+                    public boolean isSatisified() throws Exception {
+                        // usage percent updated only on send check for isFull 
so once
+                        // sends complete it is no longer updated till next 
send via a call to isFull
+                        // this is optimal as it is only used to block 
producers
+                        broker.getSystemUsage().getStoreUsage().isFull();
+                        LOG.info("store percent usage: 
"+brokerView.getStorePercentUsage());
+                        return broker.getAdminView().getStorePercentUsage() < 
minPercentUsageForStore;
+                    }
+                }));
+
+        closeConsumer();
+
+        assertTrue("in range with closed consumer",
+                Wait.waitFor(new Wait.Condition() {
+                    @Override
+                    public boolean isSatisified() throws Exception {
+                        broker.getSystemUsage().getStoreUsage().isFull();
+                        LOG.info("store precent usage: 
"+brokerView.getStorePercentUsage());
+                        return broker.getAdminView().getStorePercentUsage() < 
minPercentUsageForStore;
+                    }
+                }));
+
+        for (int i = 0; i < 5000; i++) {
+            sendMessage(false);
+        }
+
+        // What if i drop the subscription?
+        broker.getAdminView().destroyDurableSubscriber("cliID", "subName");
+
+        assertTrue("in range after send with consumer",
+                Wait.waitFor(new Wait.Condition() {
+                    @Override
+                    public boolean isSatisified() throws Exception {
+                        broker.getSystemUsage().getStoreUsage().isFull();
+                        LOG.info("store precent usage: 
"+brokerView.getStorePercentUsage());
+                        return broker.getAdminView().getStorePercentUsage() < 
minPercentUsageForStore;
+                    }
+                }));
+    }
+
+    private void openConsumer() throws Exception {
+        consumerConnection = (ActiveMQConnection) createConnection();
+        consumerConnection.setClientID("cliID");
+        consumerConnection.start();
+        Session session = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber subscriber = session.createDurableSubscriber(topic, 
"subName", "filter=true", false);
+
+        subscriber.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                // received++;
+            }
+        });
+    }
+
+    private void closeConsumer() throws JMSException {
+        if (consumerConnection != null)
+            consumerConnection.close();
+        consumerConnection = null;
+    }
+
+    private void sendMessage(boolean filter) throws Exception {
+        if (producerConnection == null) {
+            producerConnection = (ActiveMQConnection) createConnection();
+            producerConnection.start();
+            producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            producer = producerSession.createProducer(topic);
+        }
+
+        Message message = producerSession.createMessage();
+        message.setBooleanProperty("filter", filter);
+        message.setStringProperty("data", data);
+        producer.send(message);
+    }
+
+    private void startBroker(boolean deleteMessages) throws Exception {
+        broker = new BrokerService();
+        broker.setAdvisorySupport(false);
+        broker.setBrokerName("testStoreSize");
+
+        if (deleteMessages) {
+            broker.setDeleteAllMessagesOnStartup(true);
+        }
+        LOG.info("Starting broker with persistenceAdapterChoice " + 
persistenceAdapterChoice.toString());
+        setPersistenceAdapter(broker, persistenceAdapterChoice);
+        configurePersistenceAdapter(broker.getPersistenceAdapter());
+        broker.getSystemUsage().getStoreUsage().setLimit(100 * 1000 * 1000);
+        broker.start();
+    }
+
+    private void configurePersistenceAdapter(PersistenceAdapter 
persistenceAdapter) {
+        Properties properties = new Properties();
+        String maxFileLengthVal = String.valueOf(2 * 1024 * 1024);
+        properties.put("journalMaxFileLength", maxFileLengthVal);
+        properties.put("maxFileLength", maxFileLengthVal);
+        properties.put("cleanupInterval", "2000");
+        properties.put("checkpointInterval", "2000");
+
+        // leveldb
+        properties.put("logSize", maxFileLengthVal);
+
+        IntrospectionSupport.setProperties(persistenceAdapter, properties);
+    }
+
+    private void stopBroker() throws Exception {
+        if (broker != null)
+            broker.stop();
+        broker = null;
+    }
+
+    @Override
+    protected ActiveMQConnectionFactory createConnectionFactory() throws 
Exception {
+        return new 
ActiveMQConnectionFactory("vm://testStoreSize?jms.watchTopicAdvisories=false&waitForStart=5000&create=false");
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        StringBuilder sb = new StringBuilder(5000);
+        for (int i = 0; i < 5000; i++) {
+            sb.append('a');
+        }
+        data = sb.toString();
+
+        startBroker(true);
+        topic = (ActiveMQTopic) createDestination();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        stopBroker();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java
new file mode 100644
index 0000000..3c38186
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.transport.TransportDisposedIOException;
+import org.apache.activemq.util.DefaultTestAppender;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.slf4j.LoggerFactory;
+
+public class AMQ2902Test extends TestCase {
+    private static final org.slf4j.Logger LOG = 
LoggerFactory.getLogger(AMQ2580Test.class);
+
+    final AtomicBoolean gotExceptionInLog = new AtomicBoolean(Boolean.FALSE);
+    final AtomicBoolean failedToFindMDC = new AtomicBoolean(Boolean.FALSE);
+
+    Appender appender = new DefaultTestAppender() {
+        @Override
+        public void doAppend(LoggingEvent event) {
+            if (event.getThrowableInformation() != null
+                    && event.getThrowableInformation().getThrowable() 
instanceof TransportDisposedIOException) {
+
+                // Prevent StackOverflowException so we can see a sane stack 
trace.
+                if (gotExceptionInLog.get()) {
+                    return;
+                }
+
+                gotExceptionInLog.set(Boolean.TRUE);
+                LOG.error("got event: " + event + ", ex:" + 
event.getThrowableInformation().getThrowable(), 
event.getThrowableInformation().getThrowable());
+                LOG.error("Event source: ", new Throwable("Here"));
+            }
+            if( !"Loaded the Bouncy Castle security 
provider.".equals(event.getMessage()) ) {
+                if (event.getMDC("activemq.broker") == null) {
+                    failedToFindMDC.set(Boolean.TRUE);
+                }
+            }
+            return;
+        }
+    };
+
+    public void testNoExceptionOnClosewithStartStop() throws JMSException {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                "vm://localhost?broker.persistent=false");
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+        connection.stop();
+        connection.close();
+    }
+
+    public void testNoExceptionOnClose() throws JMSException {
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+                "vm://localhost?broker.persistent=false");
+        Connection connection = connectionFactory.createConnection();
+        connection.close();
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        gotExceptionInLog.set(Boolean.FALSE);
+        failedToFindMDC.set(Boolean.FALSE);
+        Logger.getRootLogger().addAppender(appender);
+        Logger.getLogger(TransportConnection.class.getName() + 
".Transport").setLevel(Level.DEBUG);
+        
Logger.getLogger(TransportConnection.class.getName()).setLevel(Level.DEBUG);
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        Logger.getRootLogger().removeAppender(appender);
+        assertFalse("got unexpected ex in log on graceful close", 
gotExceptionInLog.get());
+        assertFalse("MDC is there", failedToFindMDC.get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
new file mode 100644
index 0000000..f665431
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import 
org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(BlockJUnit4ClassRunner.class)
+public class AMQ2910Test extends JmsMultipleClientsTestSupport {
+
+    final int maxConcurrency = 60;
+    final int msgCount = 200;
+    final Vector<Throwable> exceptions = new Vector<Throwable>();
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        //persistent = true;
+        BrokerService broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.addConnector("tcp://localhost:0");
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setPendingQueuePolicy(new 
FilePendingQueueMessageStoragePolicy());
+        defaultEntry.setCursorMemoryHighWaterMark(50);
+        defaultEntry.setMemoryLimit(500*1024);
+        defaultEntry.setProducerFlowControl(false);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        broker.getSystemUsage().getMemoryUsage().setLimit(1000 * 1024);
+
+        return broker;
+    }
+
+    @Test(timeout = 30 * 1000)
+    public void testConcurrentSendToPendingCursor() throws Exception {
+        final ActiveMQConnectionFactory factory =
+                new 
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
+        factory.setCloseTimeout(30000);
+        ExecutorService executor = Executors.newCachedThreadPool();
+        for (int i=0; i<maxConcurrency; i++) {
+            final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        sendMessages(factory.createConnection(), dest, 
msgCount);
+                    } catch (Throwable t) {
+                        exceptions.add(t);
+                    }
+                }
+            });
+        }
+
+        executor.shutdown();
+
+        assertTrue("send completed", executor.awaitTermination(60, 
TimeUnit.SECONDS));
+        assertNoExceptions();
+
+        executor = Executors.newCachedThreadPool();
+        for (int i=0; i<maxConcurrency; i++) {
+            final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        startConsumers(factory, dest);
+                    } catch (Throwable t) {
+                        exceptions.add(t);
+                    }
+                }
+            });
+        }
+
+        executor.shutdown();
+        assertTrue("consumers completed", executor.awaitTermination(60, 
TimeUnit.SECONDS));
+
+        allMessagesList.setMaximumDuration(120*1000);
+        final int numExpected = maxConcurrency * msgCount;
+        allMessagesList.waitForMessagesToArrive(numExpected);
+
+        if (allMessagesList.getMessageCount() != numExpected) {
+            dumpAllThreads(getName());
+
+        }
+        allMessagesList.assertMessagesReceivedNoWait(numExpected);
+
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+
+    }
+
+    private void assertNoExceptions() {
+        if (!exceptions.isEmpty()) {
+            for (Throwable t: exceptions) {
+                t.printStackTrace();
+            }
+        }
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
new file mode 100644
index 0000000..57469d5
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ2982Test {
+
+    private static final int MAX_MESSAGES = 500;
+
+    private static final String QUEUE_NAME = "test.queue";
+
+    private BrokerService broker;
+
+    private final CountDownLatch messageCountDown = new 
CountDownLatch(MAX_MESSAGES);
+
+    private CleanableKahaDBStore kahaDB;
+
+    private static class CleanableKahaDBStore extends KahaDBStore {
+        // make checkpoint cleanup accessible
+        public void forceCleanup() throws IOException {
+            checkpointCleanup(true);
+        }
+
+        public int getFileMapSize() throws IOException {
+            // ensure save memory publishing, use the right lock
+            indexLock.readLock().lock();
+            try {
+                return getJournal().getFileMap().size();
+            } finally {
+                indexLock.readLock().unlock();
+            }
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+
+        kahaDB = new CleanableKahaDBStore();
+        kahaDB.setJournalMaxFileLength(256 * 1024);
+        broker.setPersistenceAdapter(kahaDB);
+
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    private Connection registerDLQMessageListener() throws Exception {
+        ConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(session
+                
.createQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+                messageCountDown.countDown();
+            }
+        });
+
+        return connection;
+    }
+
+    class ConsumerThread extends Thread {
+
+        @Override
+        public void run() {
+            try {
+                ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost");
+
+                RedeliveryPolicy policy = new RedeliveryPolicy();
+                policy.setMaximumRedeliveries(0);
+                policy.setInitialRedeliveryDelay(100);
+                policy.setUseExponentialBackOff(false);
+
+                factory.setRedeliveryPolicy(policy);
+
+                Connection connection = factory.createConnection();
+                connection.start();
+                Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+                MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
+                do {
+                    Message message = consumer.receive(300);
+                    if (message != null) {
+                        session.rollback();
+                    }
+                } while (messageCountDown.getCount() != 0);
+                consumer.close();
+                session.close();
+                connection.close();
+            } catch (Exception e) {
+                Assert.fail(e.getMessage());
+            }
+        }
+    }
+
+    private void sendMessages() throws Exception {
+        ConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        for (int i = 0; i < MAX_MESSAGES; i++) {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(new byte[1000]);
+            producer.send(message);
+        }
+        producer.close();
+        session.close();
+        connection.close();
+    }
+
+    @Test
+    public void testNoStickyKahaDbLogFilesOnLocalTransactionRollback() throws 
Exception {
+
+        Connection dlqConnection = registerDLQMessageListener();
+
+        ConsumerThread thread = new ConsumerThread();
+        thread.start();
+
+        sendMessages();
+
+        thread.join(60 * 1000);
+        assertFalse(thread.isAlive());
+
+        dlqConnection.close();
+
+        kahaDB.forceCleanup();
+
+        assertEquals("only one active KahaDB log file after cleanup is 
expected", 1, kahaDB.getFileMapSize());
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
new file mode 100644
index 0000000..f8b941a
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ2983Test {
+
+    private static final int MAX_CONSUMER = 10;
+
+    private static final int MAX_MESSAGES = 2000;
+
+    private static final String QUEUE_NAME = "test.queue";
+
+    private BrokerService broker;
+
+    private final CountDownLatch messageCountDown = new 
CountDownLatch(MAX_MESSAGES);
+
+    private CleanableKahaDBStore kahaDB;
+
+    private static class CleanableKahaDBStore extends KahaDBStore {
+        // make checkpoint cleanup accessible
+        public void forceCleanup() throws IOException {
+            checkpointCleanup(true);
+        }
+
+        public int getFileMapSize() throws IOException {
+            // ensure save memory publishing, use the right lock
+            indexLock.readLock().lock();
+            try {
+                return getJournal().getFileMap().size();
+            } finally {
+                indexLock.readLock().unlock();
+            }
+        }
+    }
+
+    private class ConsumerThread extends Thread {
+
+        @Override
+        public void run() {
+            try {
+                ConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost");
+                Connection connection = factory.createConnection();
+                connection.start();
+                Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+                MessageConsumer consumer = 
session.createConsumer(session.createQueue(QUEUE_NAME));
+                do {
+                    Message message = consumer.receive(200);
+                    if (message != null) {
+                        session.commit();
+                        messageCountDown.countDown();
+                    }
+                } while (messageCountDown.getCount() != 0);
+                consumer.close();
+                session.close();
+                connection.close();
+            } catch (Exception e) {
+                Assert.fail(e.getMessage());
+            }
+        }
+    }
+
+    @Before
+    public void setup() throws Exception {
+
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+
+        kahaDB = new CleanableKahaDBStore();
+        kahaDB.setJournalMaxFileLength(256 * 1024);
+        broker.setPersistenceAdapter(kahaDB);
+
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    @Test
+    public void testNoStickyKahaDbLogFilesOnConcurrentTransactionalConsumer() 
throws Exception {
+
+        List<Thread> consumerThreads = new ArrayList<Thread>();
+        for (int i = 0; i < MAX_CONSUMER; i++) {
+            ConsumerThread thread = new ConsumerThread();
+            thread.start();
+            consumerThreads.add(thread);
+        }
+        sendMessages();
+
+        boolean allMessagesReceived = messageCountDown.await(60, 
TimeUnit.SECONDS);
+        assertTrue(allMessagesReceived);
+
+        for (Thread thread : consumerThreads) {
+            thread.join(TimeUnit.MILLISECONDS.convert(60, TimeUnit.SECONDS));
+            assertFalse(thread.isAlive());
+        }
+        kahaDB.forceCleanup();
+        assertEquals("Expect only one active KahaDB log file after cleanup", 
1, kahaDB.getFileMapSize());
+    }
+
+    private void sendMessages() throws Exception {
+        ConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost");
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+        for (int i = 0; i < MAX_MESSAGES; i++) {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(new byte[200]);
+            producer.send(message);
+        }
+        producer.close();
+        session.close();
+        connection.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
new file mode 100644
index 0000000..dcd5064
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.Connection;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.*;
+import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test involves the creation of a local and remote broker, both of which
+ * communicate over VM and TCP. The local broker establishes a bridge to the
+ * remote broker for the purposes of verifying that broker info is only
+ * transfered once the local broker's ID is known to the bridge support.
+ */
+public class AMQ3014Test {
+    // Change this URL to be an unused port.
+    private static final String BROKER_URL = "tcp://localhost:0";
+
+    private List<BrokerInfo> remoteBrokerInfos = Collections
+            .synchronizedList(new ArrayList<BrokerInfo>());
+
+    private BrokerService localBroker = new BrokerService();
+
+    // Override the "remote" broker so that it records all (remote) BrokerInfos
+    // that it receives.
+    private BrokerService remoteBroker = new BrokerService() {
+        @Override
+        protected TransportConnector createTransportConnector(URI brokerURI)
+                throws Exception {
+            TransportServer transport = TransportFactorySupport.bind(this, 
brokerURI);
+            return new TransportConnector(transport) {
+                @Override
+                protected Connection createConnection(Transport transport)
+                        throws IOException {
+                    Connection connection = super.createConnection(transport);
+                    final TransportListener proxiedListener = transport
+                            .getTransportListener();
+                    transport.setTransportListener(new TransportListener() {
+
+                        @Override
+                        public void onCommand(Object command) {
+                            if (command instanceof BrokerInfo) {
+                                remoteBrokerInfos.add((BrokerInfo) command);
+                            }
+                            proxiedListener.onCommand(command);
+                        }
+
+                        @Override
+                        public void onException(IOException error) {
+                            proxiedListener.onException(error);
+                        }
+
+                        @Override
+                        public void transportInterupted() {
+                            proxiedListener.transportInterupted();
+                        }
+
+                        @Override
+                        public void transportResumed() {
+                            proxiedListener.transportResumed();
+                        }
+                    });
+                    return connection;
+                }
+
+            };
+        }
+    };
+
+    @Before
+    public void init() throws Exception {
+        localBroker.setBrokerName("localBroker");
+        localBroker.setPersistent(false);
+        localBroker.setUseJmx(false);
+        localBroker.setSchedulerSupport(false);
+
+        remoteBroker.setBrokerName("remoteBroker");
+        remoteBroker.setPersistent(false);
+        remoteBroker.setUseJmx(false);
+        remoteBroker.addConnector(BROKER_URL);
+        remoteBroker.setSchedulerSupport(false);
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        try {
+            localBroker.stop();
+        } finally {
+            remoteBroker.stop();
+        }
+    }
+
+    /**
+     * This test verifies that the local broker's ID is typically known by the
+     * bridge support before the local broker's BrokerInfo is sent to the 
remote
+     * broker.
+     */
+    @Test
+    public void NormalCaseTest() throws Exception {
+        runTest(0, 3000);
+    }
+
+    /**
+     * This test verifies that timing can arise under which the local broker's
+     * ID is not known by the bridge support before the local broker's
+     * BrokerInfo is sent to the remote broker.
+     */
+    @Test
+    public void DelayedCaseTest() throws Exception {
+        runTest(500, 3000);
+    }
+
+    private void runTest(final long taskRunnerDelay, long timeout)
+            throws Exception {
+        // Add a network connector to the local broker that will create a 
bridge
+        // to the remote broker.
+        DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector();
+        SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
+        
da.setServices(remoteBroker.getTransportConnectors().get(0).getPublishableConnectString());
+        dnc.setDiscoveryAgent(da);
+        localBroker.addNetworkConnector(dnc);
+
+        // Before starting the local broker, intercept the task runner factory
+        // so that the
+        // local VMTransport dispatcher is artificially delayed.
+        final TaskRunnerFactory realTaskRunnerFactory = localBroker
+                .getTaskRunnerFactory();
+        localBroker.setTaskRunnerFactory(new TaskRunnerFactory() {
+            public TaskRunner createTaskRunner(Task task, String name) {
+                final TaskRunner realTaskRunner = realTaskRunnerFactory
+                        .createTaskRunner(task, name);
+                if (name.startsWith("ActiveMQ Connection Dispatcher: ")) {
+                    return new TaskRunner() {
+                        @Override
+                        public void shutdown() throws InterruptedException {
+                            realTaskRunner.shutdown();
+                        }
+
+                        @Override
+                        public void shutdown(long timeout)
+                                throws InterruptedException {
+                            realTaskRunner.shutdown(timeout);
+                        }
+
+                        @Override
+                        public void wakeup() throws InterruptedException {
+                            Thread.sleep(taskRunnerDelay);
+                            realTaskRunner.wakeup();
+                        }
+                    };
+                } else {
+                    return realTaskRunnerFactory.createTaskRunner(task, name);
+                }
+            }
+        });
+
+        // Start the brokers and wait for the bridge to be created; the remote
+        // broker is started first to ensure it is available for the local
+        // broker to connect to.
+        remoteBroker.start();
+        localBroker.start();
+
+        // Wait for the remote broker to receive the local broker's BrokerInfo
+        // and then verify the local broker's ID is known.
+        long startTimeMillis = System.currentTimeMillis();
+        while (remoteBrokerInfos.isEmpty()
+                && (System.currentTimeMillis() - startTimeMillis) < timeout) {
+            Thread.sleep(100);
+        }
+
+        Assert.assertFalse("Timed out waiting for bridge to form.",
+                remoteBrokerInfos.isEmpty());
+        ;
+        Assert.assertNotNull("Local broker ID is null.", remoteBrokerInfos.get(
+                0).getBrokerId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
new file mode 100644
index 0000000..bfff0fd
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3120Test.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.ConsumerThread;
+import org.apache.activemq.util.ProducerThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.junit.Test;
+
+import javax.jms.*;
+
+import java.io.File;
+
+import static org.junit.Assert.assertEquals;
+
+public class AMQ3120Test {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ3120Test.class);
+
+    BrokerService broker = null;
+    File kahaDbDir = null;
+    private final Destination destination = new ActiveMQQueue("AMQ3120Test");
+    final String payload = new String(new byte[1024]);
+
+    protected void startBroker(boolean delete) throws Exception {
+        broker = new BrokerService();
+
+        //Start with a clean directory
+        kahaDbDir = new File(broker.getBrokerDataDirectory(), "KahaDB");
+        deleteDir(kahaDbDir);
+
+        broker.setSchedulerSupport(false);
+        broker.setDeleteAllMessagesOnStartup(delete);
+        broker.setPersistent(true);
+        broker.setUseJmx(false);
+        broker.addConnector("tcp://localhost:0");
+
+        PolicyMap map = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setUseCache(false);
+        map.setDefaultEntry(entry);
+        broker.setDestinationPolicy(map);
+
+        configurePersistence(broker, delete);
+
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    protected void configurePersistence(BrokerService brokerService, boolean 
deleteAllOnStart) throws Exception {
+        KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) 
brokerService.getPersistenceAdapter();
+
+        // ensure there are a bunch of data files but multiple entries in each
+        adapter.setJournalMaxFileLength(1024 * 20);
+
+        // speed up the test case, checkpoint an cleanup early and often
+        adapter.setCheckpointInterval(500);
+        adapter.setCleanupInterval(500);
+
+        if (!deleteAllOnStart) {
+            adapter.setForceRecoverIndex(true);
+        }
+
+    }
+
+    private boolean deleteDir(File dir) {
+        if (dir.isDirectory()) {
+            String[] children = dir.list();
+            for (int i = 0; i < children.length; i++) {
+                boolean success = deleteDir(new File(dir, children[i]));
+                if (!success) {
+                    return false;
+                }
+            }
+        }
+
+        return dir.delete();
+    }
+
+    private int getFileCount(File dir){
+        if (dir.isDirectory()) {
+            String[] children = dir.list();
+            return children.length;
+        }
+
+        return 0;
+    }
+
+    @Test
+    public void testCleanupOfFiles() throws Exception {
+        final int messageCount = 500;
+        startBroker(true);
+        int fileCount = getFileCount(kahaDbDir);
+        assertEquals(4, fileCount);
+
+        Connection connection = new ActiveMQConnectionFactory(
+                
broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
+        connection.start();
+        Session producerSess = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Session consumerSess = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        ProducerThread producer = new ProducerThread(producerSess, 
destination) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                return sess.createTextMessage(payload + "::" + i);
+            }
+        };
+        producer.setSleep(650);
+        producer.setMessageCount(messageCount);
+        ConsumerThread consumer = new ConsumerThread(consumerSess, 
destination);
+        consumer.setBreakOnNull(false);
+        consumer.setMessageCount(messageCount);
+
+        producer.start();
+        consumer.start();
+
+        producer.join();
+        consumer.join();
+
+        assertEquals("consumer got all produced messages", 
producer.getMessageCount(), consumer.getReceived());
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+    }
+
+}

Reply via email to