http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
new file mode 100644
index 0000000..491a585
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java
@@ -0,0 +1,586 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.ArrayList;
+
+import javax.jms.DeliveryMode;
+
+import junit.framework.Test;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.XATransactionId;
+
+/**
+ * Used to simulate the recovery that occurs when a broker shuts down.
+ * 
+ * 
+ */
+public class RecoveryBrokerTest extends BrokerRestartTestSupport {
+
+    /**
+     * Used to verify that after a broker restart durable subscriptions that 
use
+     * wild cards are still wild card subscription after broker restart.
+     * 
+     * @throws Exception
+     */
+    //need to revist!!!
+    public void XtestWildCardSubscriptionPreservedOnRestart() throws Exception 
{
+        ActiveMQDestination dest1 = new ActiveMQTopic("TEST.A");
+        ActiveMQDestination dest2 = new ActiveMQTopic("TEST.B");
+        ActiveMQDestination dest3 = new ActiveMQTopic("TEST.C");
+        ActiveMQDestination wildDest = new ActiveMQTopic("TEST.>");
+
+        ArrayList<MessageId> sentBeforeRestart = new ArrayList<MessageId>();
+        ArrayList<MessageId> sentBeforeCreateConsumer = new 
ArrayList<MessageId>();
+        ArrayList<MessageId> sentAfterCreateConsumer = new 
ArrayList<MessageId>();
+
+        // Setup a first connection
+        {
+            StubConnection connection1 = createConnection();
+            ConnectionInfo connectionInfo1 = createConnectionInfo();
+            connectionInfo1.setClientId("A");
+            SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+            ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+            connection1.send(connectionInfo1);
+            connection1.send(sessionInfo1);
+            connection1.send(producerInfo1);
+
+            // Create the durable subscription.
+            ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
wildDest);
+            consumerInfo1.setSubscriptionName("test");
+            consumerInfo1.setPrefetchSize(100);
+            connection1.send(consumerInfo1);
+
+            // Close the subscription.
+            connection1.send(closeConsumerInfo(consumerInfo1));
+
+            // Send the messages
+            for (int i = 0; i < 4; i++) {
+                Message m = createMessage(producerInfo1, dest1, 
DeliveryMode.PERSISTENT);
+                connection1.send(m);
+                sentBeforeRestart.add(m.getMessageId());
+            }
+            connection1.request(closeConnectionInfo(connectionInfo1));
+            connection1.stop();
+        }
+
+        // Restart the broker.
+        restartBroker();
+
+        // Get a connection to the new broker.
+        {
+            StubConnection connection2 = createConnection();
+            ConnectionInfo connectionInfo2 = createConnectionInfo();
+            connectionInfo2.setClientId("A");
+            SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+            connection2.send(connectionInfo2);
+            connection2.send(sessionInfo2);
+
+            ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+            connection2.send(producerInfo2);
+
+            // Send messages before the durable subscription is re-activated.
+            for (int i = 0; i < 4; i++) {
+                Message m = createMessage(producerInfo2, dest2, 
DeliveryMode.PERSISTENT);
+                connection2.send(m);
+                sentBeforeCreateConsumer.add(m.getMessageId());
+            }
+
+            // Re-open the subscription.
+            ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, 
wildDest);
+            consumerInfo2.setSubscriptionName("test");
+            consumerInfo2.setPrefetchSize(100);
+            connection2.send(consumerInfo2);
+
+            // Send messages after the subscription is activated.
+            for (int i = 0; i < 4; i++) {
+                Message m = createMessage(producerInfo2, dest3, 
DeliveryMode.PERSISTENT);
+                connection2.send(m);
+                sentAfterCreateConsumer.add(m.getMessageId());
+            }
+
+            // We should get the recovered messages...
+            for (int i = 0; i < 4; i++) {
+                Message m2 = receiveMessage(connection2);
+                assertNotNull("Recovered message missing: " + i, m2);
+                assertEquals(sentBeforeRestart.get(i), m2.getMessageId());
+            }
+
+            // We should get get the messages that were sent before the sub was
+            // reactivated.
+            for (int i = 0; i < 4; i++) {
+                Message m2 = receiveMessage(connection2);
+                assertNotNull("Before activated message missing: " + i, m2);
+                assertEquals(sentBeforeCreateConsumer.get(i), 
m2.getMessageId());
+            }
+
+            // We should get get the messages that were sent after the sub was
+            // reactivated.
+            for (int i = 0; i < 4; i++) {
+                Message m2 = receiveMessage(connection2);
+                assertNotNull("After activated message missing: " + i, m2);
+                assertEquals("" + i, sentAfterCreateConsumer.get(i), 
m2.getMessageId());
+            }
+
+            assertNoMessagesLeft(connection2);
+        }
+
+    }
+
+    public void testConsumedQueuePersistentMessagesLostOnRestart() throws 
Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // The we should get the messages.
+        for (int i = 0; i < 4; i++) {
+            Message m2 = receiveMessage(connection);
+            assertNotNull(m2);
+        }
+
+        // restart the broker.
+        restartBroker();
+
+        // No messages should be delivered.
+        Message m = receiveMessage(connection);
+        assertNull(m);
+    }
+
+    public void testQueuePersistentUncommitedMessagesLostOnRestart() throws 
Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // Begin the transaction.
+        LocalTransactionId txid = createLocalTransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+        }
+
+        // Don't commit
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // No messages should be delivered.
+        Message m = receiveMessage(connection);
+        assertNull(m);
+    }
+    
+    public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart() 
throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQTopic("TEST");
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        connectionInfo1.setClientId("A");
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        // Create the durable subscription.
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
+        consumerInfo1.setSubscriptionName("test");
+        consumerInfo1.setPrefetchSize(100);
+        connection1.send(consumerInfo1);
+
+        // Close the subscription.
+        connection1.send(closeConsumerInfo(consumerInfo1));
+
+        // Send the messages
+        connection1.send(createMessage(producerInfo1, destination, 
DeliveryMode.PERSISTENT));
+        connection1.send(createMessage(producerInfo1, destination, 
DeliveryMode.PERSISTENT));
+        connection1.send(createMessage(producerInfo1, destination, 
DeliveryMode.PERSISTENT));
+        connection1.send(createMessage(producerInfo1, destination, 
DeliveryMode.PERSISTENT));
+        connection1.request(closeConnectionInfo(connectionInfo1));
+        // Restart the broker.
+        restartBroker();
+
+        // Get a connection to the new broker.
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        connectionInfo2.setClientId("A");
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+
+        // Re-open the subscription.
+        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, 
destination);
+        consumerInfo2.setSubscriptionName("test");
+        consumerInfo2.setPrefetchSize(100);
+        connection2.send(consumerInfo2);
+
+        // The we should get the messages.
+        for (int i = 0; i < 4; i++) {
+            Message m2 = receiveMessage(connection2);
+            assertNotNull("Did not get message "+i, m2);
+        }
+        assertNoMessagesLeft(connection2);
+    }
+
+    public void testQueuePersistentMessagesNotLostOnRestart() throws Exception 
{
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        Message message = createMessage(producerInfo, destination);
+        message.setPersistent(true);
+        connection.send(message);
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // Message should have been dropped due to broker restart.
+        Message m = receiveMessage(connection);
+        assertNotNull("Should have received a message by now!", m);
+        assertEquals(m.getMessageId(), message.getMessageId());
+    }
+
+    public void testQueueNonPersistentMessagesLostOnRestart() throws Exception 
{
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        Message message = createMessage(producerInfo, destination);
+        message.setPersistent(false);
+        connection.send(message);
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // Message should have been dropped due to broker restart.
+        assertNoMessagesLeft(connection);
+    }
+
+    public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws 
Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // Begin the transaction.
+        LocalTransactionId txid = createLocalTransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+        }
+
+        // Commit
+        connection.send(createCommitTransaction1Phase(connectionInfo, txid));
+        connection.request(closeConnectionInfo(connectionInfo));
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        for (int i = 0; i < 4; i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull(m);
+        }
+
+        assertNoMessagesLeft(connection);
+    }
+    
+
+    public void testQueuePersistentCommitedAcksNotLostOnRestart() throws 
Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Setup the consumer and receive the message.
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // Begin the transaction.
+        LocalTransactionId txid = createLocalTransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+        for (int i = 0; i < 4; i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull(m);
+            MessageAck ack = createAck(consumerInfo, m, 1, 
MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
+        }
+        // Commit
+        connection.send(createCommitTransaction1Phase(connectionInfo, txid));
+        connection.request(closeConnectionInfo(connectionInfo));
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // No messages should be delivered.
+        Message m = receiveMessage(connection);
+        assertNull(m);
+    }
+    
+    
+
+    public void testQueuePersistentUncommitedAcksLostOnRestart() throws 
Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Setup the consumer and receive the message.
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // Begin the transaction.
+        LocalTransactionId txid = createLocalTransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+        for (int i = 0; i < 4; i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull(m);
+            MessageAck ack = createAck(consumerInfo, m, 1, 
MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
+        }
+        // Don't commit
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // All messages should be re-delivered.
+        for (int i = 0; i < 4; i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull(m);
+        }
+
+        assertNoMessagesLeft(connection);
+    }
+    
+    public void testQueuePersistentXAUncommitedAcksLostOnRestart() throws 
Exception {
+        int NUMBER = 100;
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        for (int i = 0; i < NUMBER; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Setup the consumer and receive the message.
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+        Message m = null;
+        for (int i = 0; i < NUMBER; i++) {
+            m = receiveMessage(connection);
+            assertNotNull(m);
+        }
+        MessageAck ack = createAck(consumerInfo, m, NUMBER, 
MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        // Don't commit
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // All messages should be re-delivered.
+        for (int i = 0; i < NUMBER; i++) {
+            m = receiveMessage(connection);
+            assertNotNull(m);
+        }
+
+        assertNoMessagesLeft(connection);
+    }
+
+    public static Test suite() {
+        return suite(RecoveryBrokerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
new file mode 100644
index 0000000..032934b
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartTest.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.Arrays;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TopicSubscriber;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.transport.failover.FailoverTransport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class RedeliveryRestartTest extends TestSupport {
+
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(RedeliveryRestartTest.class);
+    ActiveMQConnection connection;
+    BrokerService broker = null;
+    String queueName = "redeliveryRestartQ";
+
+    @Parameterized.Parameter
+    public TestSupport.PersistenceAdapterChoice persistenceAdapterChoice = 
PersistenceAdapterChoice.KahaDB;
+
+    @Parameterized.Parameters(name="Store={0}")
+    public static Iterable<Object[]> data() {
+        return Arrays.asList(new 
Object[][]{{TestSupport.PersistenceAdapterChoice.KahaDB},{TestSupport.PersistenceAdapterChoice.JDBC},{TestSupport.PersistenceAdapterChoice.LevelDB}});
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        configureBroker(broker);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        broker.stop();
+        super.tearDown();
+    }
+
+    protected void configureBroker(BrokerService broker) throws Exception {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setPersistJMSRedelivered(true);
+        policyMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(policyMap);
+        setPersistenceAdapter(broker, persistenceAdapterChoice);
+        broker.addConnector("tcp://0.0.0.0:0");
+    }
+
+    @org.junit.Test
+    public void testValidateRedeliveryFlagAfterRestartNoTx() throws Exception {
+
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("failover:(" + 
broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + ")?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue(queueName);
+        populateDestination(10, destination, connection);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        TextMessage msg = null;
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(20000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("first delivery", 1, 
msg.getLongProperty("JMSXDeliveryCount"));
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+        }
+        consumer.close();
+
+        restartBroker();
+
+        // make failover aware of the restarted auto assigned port
+        connection.getTransport().narrow(FailoverTransport.class).add(true, 
broker.getTransportConnectors().get(0)
+                .getPublishableConnectString());
+
+        consumer = session.createConsumer(destination);
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("redelivered? got: " + msg);
+            assertNotNull("got the message again", msg);
+            assertEquals("re delivery flag", true, msg.getJMSRedelivered());
+            assertEquals("redelivery count survives restart", 2, 
msg.getLongProperty("JMSXDeliveryCount"));
+            msg.acknowledge();
+        }
+
+        // consume the rest that were not redeliveries
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(20000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+            assertEquals("first delivery", 1, 
msg.getLongProperty("JMSXDeliveryCount"));
+            msg.acknowledge();
+        }
+        connection.close();
+    }
+
+    @org.junit.Test
+    public void testDurableSubRedeliveryFlagAfterRestartNotSupported() throws 
Exception {
+
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("failover:(" + 
broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + ")?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.setClientID("id");
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        ActiveMQTopic destination = new ActiveMQTopic(queueName);
+
+        TopicSubscriber durableSub = 
session.createDurableSubscriber(destination, "id");
+
+        populateDestination(10, destination, connection);
+
+        TextMessage msg = null;
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) durableSub.receive(20000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("first delivery", 1, 
msg.getLongProperty("JMSXDeliveryCount"));
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+        }
+        durableSub.close();
+
+        restartBroker();
+
+        // make failover aware of the restarted auto assigned port
+        connection.getTransport().narrow(FailoverTransport.class).add(true, 
broker.getTransportConnectors().get(0)
+                .getPublishableConnectString());
+
+        durableSub = session.createDurableSubscriber(destination, "id");
+        for (int i = 0; i < 10; i++) {
+            msg = (TextMessage) durableSub.receive(4000);
+            LOG.info("redelivered? got: " + msg);
+            assertNotNull("got the message again", msg);
+            assertEquals("no reDelivery flag", false, msg.getJMSRedelivered());
+            msg.acknowledge();
+        }
+        connection.close();
+    }
+
+    @org.junit.Test
+    public void testValidateRedeliveryFlagAfterRestart() throws Exception {
+
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("failover:(" + 
broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + ")?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        Destination destination = session.createQueue(queueName);
+        populateDestination(10, destination, connection);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        TextMessage msg = null;
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(20000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("first delivery", 1, 
msg.getLongProperty("JMSXDeliveryCount"));
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+        }
+        session.rollback();
+        consumer.close();
+
+        restartBroker();
+
+        // make failover aware of the restarted auto assigned port
+        connection.getTransport().narrow(FailoverTransport.class).add(true, 
broker.getTransportConnectors().get(0)
+                .getPublishableConnectString());
+
+        consumer = session.createConsumer(destination);
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("redelivered? got: " + msg);
+            assertNotNull("got the message again", msg);
+            assertEquals("redelivery count survives restart", 2, 
msg.getLongProperty("JMSXDeliveryCount"));
+            assertEquals("re delivery flag", true, msg.getJMSRedelivered());
+        }
+        session.commit();
+
+        // consume the rest that were not redeliveries
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(20000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("first delivery", 1, 
msg.getLongProperty("JMSXDeliveryCount"));
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+        }
+        session.commit();
+
+        connection.close();
+    }
+
+    @org.junit.Test
+    public void testValidateRedeliveryFlagAfterRecovery() throws Exception {
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + "?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        Destination destination = session.createQueue(queueName);
+        populateDestination(1, destination, connection);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+        TextMessage msg = (TextMessage) consumer.receive(5000);
+        LOG.info("got: " + msg);
+        assertNotNull("got the message", msg);
+        assertEquals("first delivery", 1, 
msg.getLongProperty("JMSXDeliveryCount"));
+        assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+
+        stopBrokerWithStoreFailure(broker, persistenceAdapterChoice);
+
+        broker = createRestartedBroker();
+        broker.start();
+
+        connection.close();
+
+        connectionFactory = new 
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        consumer = session.createConsumer(destination);
+        msg = (TextMessage) consumer.receive(10000);
+        assertNotNull("got the message again", msg);
+        assertEquals("redelivery count survives restart", 2, 
msg.getLongProperty("JMSXDeliveryCount"));
+        assertEquals("re delivery flag", true, msg.getJMSRedelivered());
+
+        session.commit();
+        connection.close();
+    }
+
+    private void restartBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = createRestartedBroker();
+        broker.start();
+    }
+
+    private BrokerService createRestartedBroker() throws Exception {
+        broker = new BrokerService();
+        configureBroker(broker);
+        return broker;
+    }
+
+    private void populateDestination(final int nbMessages, final Destination 
destination, javax.jms.Connection connection) throws JMSException {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 1; i <= nbMessages; i++) {
+            producer.send(session.createTextMessage("<hello id='" + i + 
"'/>"));
+        }
+        producer.close();
+        session.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
new file mode 100644
index 0000000..5d8b62e
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java
@@ -0,0 +1,472 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.scheduler.JobSchedulerStore;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.ProxyMessageStore;
+import org.apache.activemq.store.ProxyTopicMessageStore;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.usage.SystemUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RedeliveryRestartWithExceptionTest extends TestSupport {
+
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(RedeliveryRestartWithExceptionTest.class);
+    ActiveMQConnection connection;
+    BrokerService broker = null;
+    String queueName = "redeliveryRestartQ";
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        configureBroker(broker, true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+        broker.stop();
+        super.tearDown();
+    }
+
+    protected void configureBroker(BrokerService broker, boolean 
throwExceptionOnUpdate) throws Exception {
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setPersistJMSRedelivered(true);
+        policyMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(policyMap);
+        broker.setPersistenceAdapter(new 
KahaDBWithUpdateExceptionPersistenceAdapter(throwExceptionOnUpdate));
+        broker.addConnector("tcp://0.0.0.0:0");
+    }
+
+    @org.junit.Test
+    public void testValidateRedeliveryFlagAfterRestart() throws Exception {
+
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + "?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue(queueName);
+        populateDestination(10, destination, connection, true);
+        TextMessage msg = null;
+        MessageConsumer consumer = session.createConsumer(destination);
+        Exception expectedException = null;
+        try {
+            for (int i = 0; i < 5; i++) {
+                msg = (TextMessage) consumer.receive(5000);
+                LOG.info("not redelivered? got: " + msg);
+                assertNotNull("got the message", msg);
+                assertTrue("Should not receive the 5th message", i < 4);
+                //The first 4 messages will be ok but the 5th one should hit 
an exception in updateMessage and should not be delivered
+            }
+        } catch (Exception e) {
+            //Expecting an exception and disconnect on the 5th message
+            LOG.info("Got expected:", e);
+            expectedException = e;
+        }
+        assertNotNull("Expecting an exception when updateMessage fails", 
expectedException);                
+        
+        consumer.close();
+        connection.close();
+        
+        restartBroker();
+        
+        connectionFactory = new 
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + "?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = session.createQueue(queueName);
+        consumer = session.createConsumer(destination);
+        
+        
+        // consume the messages that were previously delivered
+        for (int i = 0; i < 4; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("redelivered? got: " + msg);
+            assertNotNull("got the message again", msg);
+            assertEquals("re delivery flag", true, msg.getJMSRedelivered());
+            assertTrue("redelivery count survives restart", 
msg.getLongProperty("JMSXDeliveryCount") > 1);
+            msg.acknowledge();
+        }
+        
+
+        // consume the rest that were not redeliveries
+        for (int i = 0; i < 6; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+            assertEquals("first delivery", 1, 
msg.getLongProperty("JMSXDeliveryCount"));
+            msg.acknowledge();
+        }
+        connection.close();
+    }
+
+
+    @org.junit.Test
+    public void 
testValidateRedeliveryFlagAfterTransientFailureConnectionDrop() throws 
Exception {
+
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + "?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue(queueName);
+        populateDestination(10, destination, connection, true);
+        TextMessage msg = null;
+        MessageConsumer consumer = session.createConsumer(destination);
+        Exception expectedException = null;
+        try {
+            for (int i = 0; i < 5; i++) {
+                msg = (TextMessage) consumer.receive(5000);
+                LOG.info("not redelivered? got: " + msg);
+                assertNotNull("got the message", msg);
+                assertTrue("Should not receive the 5th message", i < 4);
+                //The first 4 messages will be ok but the 5th one should hit 
an exception in updateMessage and should not be delivered
+            }
+        } catch (Exception e) {
+            //Expecting an exception and disconnect on the 5th message
+            LOG.info("Got expected:", e);
+            expectedException = e;
+        }
+        assertNotNull("Expecting an exception when updateMessage fails", 
expectedException);
+
+        consumer.close();
+        connection.close();
+
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = session.createQueue(queueName);
+        consumer = session.createConsumer(destination);
+
+
+        // consume the messages that were previously delivered
+        for (int i = 0; i < 4; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("redelivered? got: " + msg);
+            assertNotNull("got the message again", msg);
+            assertEquals("re delivery flag on:" + i, true, 
msg.getJMSRedelivered());
+            assertTrue("redelivery count survives reconnect for:" + i, 
msg.getLongProperty("JMSXDeliveryCount") > 1);
+            msg.acknowledge();
+        }
+
+
+        // consume the rest that were not redeliveries
+        for (int i = 0; i < 6; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+            assertEquals("first delivery", 1, 
msg.getLongProperty("JMSXDeliveryCount"));
+            msg.acknowledge();
+        }
+        connection.close();
+    }
+
+    @org.junit.Test
+    public void 
testValidateRedeliveryFlagOnNonPersistentAfterTransientFailureConnectionDrop() 
throws Exception {
+
+        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()
+            + "?jms.prefetchPolicy.all=0");
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue(queueName);
+        populateDestination(10, destination, connection, false);
+        TextMessage msg = null;
+        MessageConsumer consumer = session.createConsumer(destination);
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(5000);
+            assertNotNull("got the message", msg);
+            assertFalse("not redelivered", msg.getJMSRedelivered());
+        }
+
+        
connection.getTransport().narrow(TcpTransport.class).getTransportListener().onException(new
 IOException("Die"));
+
+        connection = (ActiveMQConnection) connectionFactory.createConnection();
+        connection.start();
+
+        session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = session.createQueue(queueName);
+        consumer = session.createConsumer(destination);
+
+        // consume the messages that were previously delivered
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("redelivered? got: " + msg);
+            assertNotNull("got the message again", msg);
+            assertEquals("redelivery flag set on:" + i, true, 
msg.getJMSRedelivered());
+            assertTrue("redelivery count survives reconnect for:" + i, 
msg.getLongProperty("JMSXDeliveryCount") > 1);
+            msg.acknowledge();
+        }
+
+        // consume the rest that were not redeliveries
+        for (int i = 0; i < 5; i++) {
+            msg = (TextMessage) consumer.receive(4000);
+            LOG.info("not redelivered? got: " + msg);
+            assertNotNull("got the message", msg);
+            assertEquals("not a redelivery", false, msg.getJMSRedelivered());
+            assertEquals("first delivery", 1, 
msg.getLongProperty("JMSXDeliveryCount"));
+            msg.acknowledge();
+        }
+        connection.close();
+    }
+
+    private void restartBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = createRestartedBroker();
+        broker.start();
+    }
+
+    private BrokerService createRestartedBroker() throws Exception {
+        broker = new BrokerService();
+        configureBroker(broker, false);
+        return broker;
+    }
+
+    private void populateDestination(final int nbMessages, final Destination 
destination, javax.jms.Connection connection, boolean persistent) throws 
JMSException {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : 
DeliveryMode.NON_PERSISTENT);
+        for (int i = 1; i <= nbMessages; i++) {
+            producer.send(session.createTextMessage("<hello id='" + i + 
"'/>"));
+        }
+        producer.close();
+        session.close();
+    }
+    
+    private class KahaDBWithUpdateExceptionPersistenceAdapter implements 
PersistenceAdapter {
+        
+        private KahaDBPersistenceAdapter kahaDB = new 
KahaDBPersistenceAdapter();
+        private boolean throwExceptionOnUpdate;
+        
+        public KahaDBWithUpdateExceptionPersistenceAdapter(boolean 
throwExceptionOnUpdate) {
+            this.throwExceptionOnUpdate = throwExceptionOnUpdate;
+        }
+        
+        @Override
+        public void start() throws Exception {
+            kahaDB.start();
+        }
+
+        @Override
+        public void stop() throws Exception {
+            kahaDB.stop();
+        }
+
+        @Override
+        public Set<ActiveMQDestination> getDestinations() {
+            return kahaDB.getDestinations();
+        }
+
+        @Override
+        public MessageStore createQueueMessageStore(ActiveMQQueue destination) 
+                throws IOException {
+            MessageStore proxyMessageStoreWithException = new 
ProxyMessageStoreWithUpdateException(
+                    kahaDB.createQueueMessageStore(destination), 
throwExceptionOnUpdate);
+            return proxyMessageStoreWithException;
+        }
+
+        @Override
+        public TopicMessageStore createTopicMessageStore(
+                ActiveMQTopic destination) throws IOException {
+            TopicMessageStore proxyMessageStoreWithException = new 
ProxyTopicMessageStoreWithUpdateException(
+                    kahaDB.createTopicMessageStore(destination), 
throwExceptionOnUpdate);
+            return proxyMessageStoreWithException;
+        }
+        
+        @Override
+        public JobSchedulerStore createJobSchedulerStore() throws IOException, 
UnsupportedOperationException {
+            return kahaDB.createJobSchedulerStore();
+        }
+
+        @Override
+        public void removeQueueMessageStore(ActiveMQQueue destination) {
+            kahaDB.removeQueueMessageStore(destination);
+        }
+
+        @Override
+        public void removeTopicMessageStore(ActiveMQTopic destination) {
+            kahaDB.removeTopicMessageStore(destination);
+        }
+
+        @Override
+        public TransactionStore createTransactionStore() throws IOException {
+            return kahaDB.createTransactionStore();
+        }
+
+        @Override
+        public void beginTransaction(ConnectionContext context)
+                throws IOException {
+            kahaDB.beginTransaction(context);
+        }
+
+        @Override
+        public void commitTransaction(ConnectionContext context)
+                throws IOException {
+            kahaDB.commitTransaction(context);
+        }
+
+        @Override
+        public void rollbackTransaction(ConnectionContext context)
+                throws IOException {
+            kahaDB.rollbackTransaction(context);
+        }
+
+        @Override
+        public long getLastMessageBrokerSequenceId() throws IOException {
+            return kahaDB.getLastMessageBrokerSequenceId();
+        }
+
+        @Override
+        public void deleteAllMessages() throws IOException {
+            kahaDB.deleteAllMessages();
+        }
+
+        @Override
+        public void setUsageManager(SystemUsage usageManager) {
+            kahaDB.setUsageManager(usageManager);
+        }
+
+        @Override
+        public void setBrokerName(String brokerName) {
+            kahaDB.setBrokerName(brokerName);
+        }
+
+        @Override
+        public void setDirectory(File dir) {
+            kahaDB.setDirectory(dir);
+        }
+
+        @Override
+        public File getDirectory() {
+            return kahaDB.getDirectory();
+        }
+
+        @Override
+        public void checkpoint(boolean sync) throws IOException {
+            kahaDB.checkpoint(sync);
+        }
+
+        @Override
+        public long size() {
+            return kahaDB.size();
+        }
+
+        @Override
+        public long getLastProducerSequenceId(ProducerId id) throws 
IOException {
+            return kahaDB.getLastProducerSequenceId(id);
+        }
+        
+    }
+    
+    private class ProxyMessageStoreWithUpdateException extends 
ProxyMessageStore {
+        private boolean throwExceptionOnUpdate;
+        private int numBeforeException = 4;
+        public ProxyMessageStoreWithUpdateException(MessageStore delegate, 
boolean throwExceptionOnUpdate) {
+            super(delegate);
+            this.throwExceptionOnUpdate = throwExceptionOnUpdate;
+        }
+        
+        @Override
+        public void updateMessage(Message message) throws IOException {
+            if(throwExceptionOnUpdate) {
+                if(numBeforeException > 0) {
+                    numBeforeException--;
+                    super.updateMessage(message);
+                } else {
+                    // lets only do it once so we can validate transient store 
failure
+                    throwExceptionOnUpdate = false;
+
+                    //A message that has never been delivered will hit this 
exception
+                    throw new IOException("Hit our simulated exception writing 
the update to disk");
+                }
+            } else {
+                super.updateMessage(message);
+            }
+        }
+    }
+    
+    private class ProxyTopicMessageStoreWithUpdateException extends 
ProxyTopicMessageStore {
+        private boolean throwExceptionOnUpdate;
+        private int numBeforeException = 4;
+        public ProxyTopicMessageStoreWithUpdateException(TopicMessageStore 
delegate, boolean throwExceptionOnUpdate) {
+            super(delegate);
+            this.throwExceptionOnUpdate = throwExceptionOnUpdate;
+        }
+        
+        @Override
+        public void updateMessage(Message message) throws IOException {
+            if(throwExceptionOnUpdate) {
+                if(numBeforeException > 0) {
+                    numBeforeException--;
+                    super.updateMessage(message);
+                } else {
+                    //A message that has never been delivered will hit this 
exception
+                    throw new IOException("Hit our simulated exception writing 
the update to disk");
+                }
+            } else {
+                super.updateMessage(message);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/SpringTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/SpringTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/SpringTest.java
new file mode 100644
index 0000000..7902baa
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/SpringTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.spring.SpringConsumer;
+import org.apache.activemq.spring.SpringProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.support.AbstractApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class SpringTest extends TestCase {
+    
+    private static final Logger LOG = 
LoggerFactory.getLogger(SpringTest.class);
+
+    protected AbstractApplicationContext context;
+    protected SpringConsumer consumer;
+    protected SpringProducer producer;
+
+    public void testSenderWithSpringXml() throws Exception {
+        assertSenderConfig("org/apache/activemq/broker/spring.xml");
+    }
+    /**
+     * assert method that is used by all the test method to send and receive 
messages
+     * based on each spring configuration.
+     *
+     * @param config
+     * @throws Exception
+     */
+    protected void assertSenderConfig(String config) throws Exception {
+        context = new ClassPathXmlApplicationContext(config);
+
+        consumer = (SpringConsumer) context.getBean("consumer");
+        assertTrue("Found a valid consumer", consumer != null);
+
+        consumer.start();
+
+        producer = (SpringProducer) context.getBean("producer");
+        assertTrue("Found a valid producer", producer != null);
+
+        consumer.flushMessages();
+        producer.start();
+
+        // lets sleep a little to give the JMS time to dispatch stuff
+        consumer.waitForMessagesToArrive(producer.getMessageCount());
+
+        // now lets check that the consumer has received some messages
+        List messages = consumer.flushMessages();
+        LOG.info("Consumer has received messages....");
+        for (Iterator iter = messages.iterator(); iter.hasNext();) {
+            Object message = iter.next();
+            LOG.info("Received: " + message);
+        }
+
+        assertEquals("Message count", producer.getMessageCount(), 
messages.size());
+    }
+
+    /**
+     * Clean up method.
+     *
+     * @throws Exception
+     */
+    protected void tearDown() throws Exception {
+        if (consumer != null) {
+            consumer.stop();
+        }
+        if (producer != null) {
+            producer.stop();
+        }
+
+        if (context != null) {
+            context.destroy();
+        }
+    }
+
+    protected void setUp() throws Exception {
+        if (System.getProperty("basedir") == null) {
+            File file = new File(".");
+            System.setProperty("basedir", file.getAbsolutePath());
+        }
+        super.setUp();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubBroker.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubBroker.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubBroker.java
new file mode 100644
index 0000000..7b4fa1b
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubBroker.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker;
+
+import java.util.LinkedList;
+import org.apache.activemq.command.ConnectionInfo;
+
+public class StubBroker extends EmptyBroker {
+    public LinkedList<AddConnectionData> addConnectionData = new 
LinkedList<AddConnectionData>();
+    public LinkedList<RemoveConnectionData> removeConnectionData = new 
LinkedList<RemoveConnectionData>();
+
+    public class AddConnectionData {
+        public final ConnectionContext connectionContext;
+        public final ConnectionInfo connectionInfo;
+
+        public AddConnectionData(ConnectionContext context, ConnectionInfo 
info) {
+            connectionContext = context;
+            connectionInfo = info;
+        }
+    }
+
+    public static class RemoveConnectionData {
+        public final ConnectionContext connectionContext;
+        public final ConnectionInfo connectionInfo;
+        public final Throwable error;
+
+        public RemoveConnectionData(ConnectionContext context, ConnectionInfo 
info, Throwable error) {
+            connectionContext = context;
+            connectionInfo = info;
+            this.error = error;
+        }
+    }
+
+    public void addConnection(ConnectionContext context, ConnectionInfo info) 
throws Exception {
+        addConnectionData.add(new AddConnectionData(context, info));
+    }
+
+    public void removeConnection(ConnectionContext context, ConnectionInfo 
info, Throwable error) throws Exception {
+        removeConnectionData.add(new RemoveConnectionData(context, info, 
error));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubConnection.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubConnection.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubConnection.java
new file mode 100644
index 0000000..9a70c4e
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/StubConnection.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.activemq.Service;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.transport.DefaultTransportListener;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.activemq.util.ServiceSupport;
+
+public class StubConnection implements Service {
+
+    private final BlockingQueue<Object> dispatchQueue = new 
LinkedBlockingQueue<Object>();
+    private Connection connection;
+    private Transport transport;
+    private boolean shuttingDown;
+    private TransportListener listener;
+    public AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+    public StubConnection(BrokerService broker) throws Exception {
+        this(TransportFactory.connect(broker.getVmConnectorURI()));
+    }
+
+    public StubConnection(Connection connection) {
+        this.connection = connection;
+    }
+
+    public StubConnection(Transport transport) throws Exception {
+        this(transport, null);
+    }
+
+    public StubConnection(Transport transport, TransportListener 
transportListener) throws Exception {
+        listener = transportListener;
+        this.transport = transport;
+        transport.setTransportListener(new DefaultTransportListener() {
+            public void onCommand(Object command) {
+                try {
+                    if (command.getClass() == ShutdownInfo.class) {
+                        shuttingDown = true;
+                    }
+                    StubConnection.this.dispatch(command);
+                } catch (Exception e) {
+                    onException(new IOException("" + e));
+                }
+            }
+
+            public void onException(IOException e) {
+                if (listener != null) {
+                    listener.onException(e);
+                }
+                error.set(e);
+            }
+        });
+        transport.start();
+    }
+
+    protected void dispatch(Object command) throws InterruptedException, 
IOException {
+        if (listener != null) {
+            listener.onCommand(command);
+        }
+        dispatchQueue.put(command);
+    }
+
+    public BlockingQueue<Object> getDispatchQueue() {
+        return dispatchQueue;
+    }
+
+    public void send(Command command) throws Exception {
+        if (command instanceof Message) {
+            Message message = (Message)command;
+            message.setProducerId(message.getMessageId().getProducerId());
+        }
+        command.setResponseRequired(false);
+        if (connection != null) {
+            Response response = connection.service(command);
+            if (response != null && response.isException()) {
+                ExceptionResponse er = (ExceptionResponse)response;
+                throw JMSExceptionSupport.create(er.getException());
+            }
+        } else if (transport != null) {
+            transport.oneway(command);
+        }
+    }
+
+    public Response request(Command command) throws Exception {
+        if (command instanceof Message) {
+            Message message = (Message)command;
+            message.setProducerId(message.getMessageId().getProducerId());
+        }
+        command.setResponseRequired(true);
+        if (connection != null) {
+            Response response = connection.service(command);
+            if (response != null && response.isException()) {
+                ExceptionResponse er = (ExceptionResponse)response;
+                throw JMSExceptionSupport.create(er.getException());
+            }
+            return response;
+        } else if (transport != null) {
+            Response response = (Response)transport.request(command);
+            if (response != null && response.isException()) {
+                ExceptionResponse er = (ExceptionResponse)response;
+                throw JMSExceptionSupport.create(er.getException());
+            }
+            return response;
+        }
+        return null;
+    }
+
+    public Connection getConnection() {
+        return connection;
+    }
+
+    public Transport getTransport() {
+        return transport;
+    }
+
+    public void start() throws Exception {
+    }
+
+    public void stop() throws Exception {
+        shuttingDown = true;
+        if (transport != null) {
+            try {
+                transport.oneway(new ShutdownInfo());
+            } catch (IOException e) {
+            }
+            ServiceSupport.dispose(transport);
+        }
+    }
+
+    public TransportListener getListener() {
+        return listener;
+    }
+
+    public void setListener(TransportListener listener) {
+        this.listener = listener;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
new file mode 100644
index 0000000..61ba79c
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/TopicSubscriptionTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.util.ThreadTracker;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+
+import static org.junit.Assert.*;
+
+
+@RunWith(BlockJUnit4ClassRunner.class)
+public class TopicSubscriptionTest extends QueueSubscriptionTest {
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        durable = true;
+        topic = true;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        ThreadTracker.result();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testManyProducersManyConsumers() throws Exception {
+        consumerCount = 40;
+        producerCount = 20;
+        messageCount  = 100;
+        messageSize   = 1; 
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount * 
consumerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws 
Exception {
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount  = 10;
+        messageSize   = 1024 * 1024 * 1; // 1 MB
+        prefetchCount = 1;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * consumerCount * 
producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws 
Exception {
+        consumerCount = 2;
+        producerCount = 1;
+        prefetchCount = 1;
+        messageSize   = 1024;
+        messageCount  = 1000;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * consumerCount * 
producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws 
Exception {
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount  = 1000;
+        messageSize   = 1024;
+        prefetchCount = messageCount * 2;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * consumerCount * 
producerCount);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws 
Exception {
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount  = 10;
+        messageSize   = 1024 * 1024 * 1; // 1 MB
+        prefetchCount = messageCount * 2;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * consumerCount * 
producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerManyConsumersFewMessages() throws Exception {
+        consumerCount = 50;
+        producerCount = 1;
+        messageCount  = 10;
+        messageSize   = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * consumerCount * 
producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerManyConsumersManyMessages() throws Exception {
+        consumerCount = 50;
+        producerCount = 1;
+        messageCount  = 100;
+        messageSize   = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * consumerCount * 
producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+
+    @Test(timeout = 60 * 1000)
+    public void testManyProducersOneConsumer() throws Exception {
+        consumerCount = 1;
+        producerCount = 20;
+        messageCount  = 100;
+        messageSize   = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount * 
consumerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+}

Reply via email to