http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
new file mode 100644
index 0000000..2c41673
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/XARecoveryBrokerTest.java
@@ -0,0 +1,1246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.JMSException;
+import javax.management.InstanceNotFoundException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import junit.framework.Test;
+import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean;
+import org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.*;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.JMXSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to simulate the recovery that occurs when a broker shuts down.
+ * 
+ * 
+ */
+public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(XARecoveryBrokerTest.class);
+    public boolean prioritySupport = false;
+
+    public void testPreparedJmxView() throws Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // Prepare 4 message sends.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            XATransactionId txid = createXATransaction(sessionInfo);
+            connection.send(createBeginTransaction(connectionInfo, txid));
+
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+
+            // Prepare
+            connection.send(createPrepareTransaction(connectionInfo, txid));
+        }
+
+        Response response = connection.request(new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER));
+        assertNotNull(response);
+        DataArrayResponse dar = (DataArrayResponse)response;
+        assertEquals(4, dar.getData().length);
+
+        // view prepared in kahadb view
+        if (broker.getPersistenceAdapter() instanceof 
KahaDBPersistenceAdapter) {
+            PersistenceAdapterViewMBean kahadbView = 
getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
+            String txFromView = kahadbView.getTransactions();
+            LOG.info("Tx view fromm PA:" + txFromView);
+            assertTrue("xid with our dud format in transaction string " + 
txFromView, txFromView.contains("XID:[55,"));
+        }
+
+        // restart the broker.
+        restartBroker();
+
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connection.send(connectionInfo);
+
+
+        response = connection.request(new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER));
+        assertNotNull(response);
+        dar = (DataArrayResponse)response;
+        assertEquals(4, dar.getData().length);
+
+        // validate destination depth via jmx
+        DestinationViewMBean destinationView = 
getProxyToDestination(destinationList(destination)[0]);
+        assertEquals("enqueue count does not see prepared", 0, 
destinationView.getQueueSize());
+
+        TransactionId first = (TransactionId)dar.getData()[0];
+        int commitCount = 0;
+        // via jmx, force outcome
+        for (int i = 0; i < 4; i++) {
+            RecoveredXATransactionViewMBean mbean =  
getProxyToPreparedTransactionViewMBean((TransactionId)dar.getData()[i]);
+            if (i%2==0) {
+                mbean.heuristicCommit();
+                commitCount++;
+            } else {
+                mbean.heuristicRollback();
+            }
+        }
+
+        // verify all completed
+        response = connection.request(new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER));
+        assertNotNull(response);
+        dar = (DataArrayResponse)response;
+        assertEquals(0, dar.getData().length);
+
+        // verify messages available
+        assertEquals("enqueue count reflects outcome", commitCount, 
destinationView.getQueueSize());
+
+        // verify mbeans gone
+        try {
+            RecoveredXATransactionViewMBean gone = 
getProxyToPreparedTransactionViewMBean(first);
+            gone.heuristicRollback();
+            fail("Excepted not found");
+        } catch (InstanceNotFoundException expectedNotfound) {
+        }
+    }
+
+    private PersistenceAdapterViewMBean getProxyToPersistenceAdapter(String 
name) throws MalformedObjectNameException, JMSException {
+       return 
(PersistenceAdapterViewMBean)broker.getManagementContext().newProxyInstance(
+               
BrokerMBeanSupport.createPersistenceAdapterName(broker.getBrokerObjectName().toString(),
 name),
+               PersistenceAdapterViewMBean.class, true);
+    }
+
+    private RecoveredXATransactionViewMBean 
getProxyToPreparedTransactionViewMBean(TransactionId xid) throws 
MalformedObjectNameException, JMSException {
+
+        ObjectName objectName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,transactionType=RecoveredXaTransaction,xid="
 +
+                JMXSupport.encodeObjectNamePart(xid.toString()));
+        RecoveredXATransactionViewMBean proxy = 
(RecoveredXATransactionViewMBean) 
broker.getManagementContext().newProxyInstance(objectName,
+                RecoveredXATransactionViewMBean.class, true);
+        return proxy;
+    }
+
+    private DestinationViewMBean getProxyToDestination(ActiveMQDestination 
destination) throws MalformedObjectNameException, JMSException {
+
+        final ObjectName objectName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName="+broker.getBrokerName()+",destinationType="
+                + 
JMXSupport.encodeObjectNamePart(destination.getDestinationTypeAsString())
+                + ",destinationName=" + 
JMXSupport.encodeObjectNamePart(destination.getPhysicalName()));
+
+        DestinationViewMBean proxy = (DestinationViewMBean) 
broker.getManagementContext().newProxyInstance(objectName,
+                DestinationViewMBean.class, true);
+        return proxy;
+
+    }
+
+    public void testPreparedTransactionRecoveredOnRestart() throws Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // Prepare 4 message sends.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            XATransactionId txid = createXATransaction(sessionInfo);
+            connection.send(createBeginTransaction(connectionInfo, txid));
+
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+
+            // Prepare
+            connection.send(createPrepareTransaction(connectionInfo, txid));
+        }
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and try receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+
+        Response response = connection.request(new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER));
+        assertNotNull(response);
+        DataArrayResponse dar = (DataArrayResponse)response;
+        assertEquals(4, dar.getData().length);
+
+        // ensure we can close a connection with prepared transactions
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // open again  to deliver outcome
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // Commit the prepared transactions.
+        for (int i = 0; i < dar.getData().length; i++) {
+            TransactionId transactionId = (TransactionId) dar.getData()[i];
+            LOG.info("commit: " + transactionId);
+            connection.request(createCommitTransaction2Phase(connectionInfo, 
transactionId));
+        }
+
+        // We should get the committed transactions.
+        final int countToReceive = expectedMessageCount(4, destination);
+        for (int i = 0; i < countToReceive ; i++) {
+            Message m = receiveMessage(connection, 
TimeUnit.SECONDS.toMillis(10));
+            LOG.info("received: " + m);
+            assertNotNull("Got non null message: " + i, m);
+        }
+
+        assertNoMessagesLeft(connection);
+        assertEmptyDLQ();
+    }
+
+    private void assertEmptyDLQ() throws Exception {
+        try {
+            DestinationViewMBean destinationView = getProxyToDestination(new 
ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+            assertEquals("nothing on dlq", 0, destinationView.getQueueSize());
+            assertEquals("nothing added to dlq", 0, 
destinationView.getEnqueueCount());
+        } catch (java.lang.reflect.UndeclaredThrowableException maybeOk) {
+            if (maybeOk.getUndeclaredThrowable() instanceof 
javax.management.InstanceNotFoundException) {
+                // perfect no dlq
+            } else {
+                throw maybeOk;
+            }
+        }
+    }
+
+    public void testPreparedInterleavedTransactionRecoveredOnRestart() throws 
Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        // Prepare 4 message sends.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            XATransactionId txid = createXATransaction(sessionInfo);
+            connection.send(createBeginTransaction(connectionInfo, txid));
+
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+
+            // Prepare
+            connection.send(createPrepareTransaction(connectionInfo, txid));
+        }
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+
+        // send non tx message
+        Message message = createMessage(producerInfo, destination);
+        message.setPersistent(true);
+        connection.request(message);
+
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and try receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // consume non transacted message, but don't ack
+        int countToReceive = expectedMessageCount(1, destination);
+        for (int i=0; i< countToReceive; i++) {
+            Message m = receiveMessage(connection, 
TimeUnit.SECONDS.toMillis(10));
+            LOG.info("received: " + m);
+            assertNotNull("got non tx message after prepared", m);
+        }
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+
+        Response response = connection.request(new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER));
+        assertNotNull(response);
+        DataArrayResponse dar = (DataArrayResponse)response;
+        assertEquals(4, dar.getData().length);
+
+        // ensure we can close a connection with prepared transactions
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // open again  to deliver outcome
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+
+        // Commit the prepared transactions.
+        for (int i = 0; i < dar.getData().length; i++) {
+            TransactionId transactionId = (TransactionId) dar.getData()[i];
+            LOG.info("commit: " + transactionId);
+            connection.request(createCommitTransaction2Phase(connectionInfo, 
transactionId));
+        }
+
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // We should get the committed transactions and the non tx message
+        countToReceive = expectedMessageCount(5, destination);
+        for (int i = 0; i < countToReceive ; i++) {
+            Message m = receiveMessage(connection, 
TimeUnit.SECONDS.toMillis(10));
+            LOG.info("received: " + m);
+            assertNotNull("Got non null message: " + i, m);
+        }
+
+        assertNoMessagesLeft(connection);
+        assertEmptyDLQ();
+    }
+
+    public void testTopicPreparedTransactionRecoveredOnRestart() throws 
Exception {
+        ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        // Prepare 4 message sends.
+        for (int i = 0; i < 4; i++) {
+            // Begin the transaction.
+            XATransactionId txid = createXATransaction(sessionInfo);
+            connection.send(createBeginTransaction(connectionInfo, txid));
+
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+
+            // Prepare
+            connection.send(createPrepareTransaction(connectionInfo, txid));
+        }
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and try receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        // Since prepared but not committed.. they should not get delivered.
+        assertNull(receiveMessage(connection));
+        assertNoMessagesLeft(connection);
+
+        Response response = connection.request(new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER));
+        assertNotNull(response);
+        DataArrayResponse dar = (DataArrayResponse) response;
+        assertEquals(4, dar.getData().length);
+
+        // ensure we can close a connection with prepared transactions
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        // open again  to deliver outcome
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        // Commit the prepared transactions.
+        for (int i = 0; i < dar.getData().length; i++) {
+            connection.request(createCommitTransaction2Phase(connectionInfo, 
(TransactionId) dar.getData()[i]));
+        }
+
+        // We should get the committed transactions.
+        for (int i = 0; i < expectedMessageCount(4, destination); i++) {
+            Message m = receiveMessage(connection, 
TimeUnit.SECONDS.toMillis(10));
+            assertNotNull(m);
+        }
+
+        assertNoMessagesLeft(connection);
+
+    }
+
+    public void testQueuePersistentCommitedMessagesNotLostOnRestart() throws 
Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+        }
+
+        // Commit
+        connection.send(createCommitTransaction1Phase(connectionInfo, txid));
+        connection.request(closeConnectionInfo(connectionInfo));
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        for (int i = 0; i < expectedMessageCount(4, destination); i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull(m);
+        }
+
+        assertNoMessagesLeft(connection);
+    }
+
+    public void testQueuePersistentCommited2PhaseMessagesNotLostOnRestart() 
throws Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            message.setTransactionId(txid);
+            connection.send(message);
+        }
+
+        // Commit 2 phase
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+        connection.send(createCommitTransaction2Phase(connectionInfo, txid));
+
+        connection.request(closeConnectionInfo(connectionInfo));
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        connection.send(consumerInfo);
+
+        for (int i = 0; i < expectedMessageCount(4, destination); i++) {
+            Message m = receiveMessage(connection);
+            assertNotNull(m);
+        }
+
+        assertNoMessagesLeft(connection);
+    }
+
+    public void testQueuePersistentCommitedAcksNotLostOnRestart() throws 
Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        ConsumerInfo consumerInfo;
+        Message m = null;
+        for (ActiveMQDestination dest : destinationList(destination)) {
+            // Setup the consumer and receive the message.
+            consumerInfo = createConsumerInfo(sessionInfo, dest);
+            connection.send(consumerInfo);
+
+            for (int i = 0; i < 4; i++) {
+                m = receiveMessage(connection);
+                assertNotNull(m);
+            }
+
+            MessageAck ack = createAck(consumerInfo, m, 4, 
MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
+        }
+
+        // Commit
+        connection.request(createCommitTransaction1Phase(connectionInfo, 
txid));
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // No messages should be delivered.
+        assertNoMessagesLeft(connection);
+
+        m = receiveMessage(connection);
+        assertNull(m);
+    }
+    
+    public void testQueuePersistentPreparedAcksNotLostOnRestart() throws 
Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        ConsumerInfo consumerInfo;
+        Message m = null;
+        for (ActiveMQDestination dest : destinationList(destination)) {
+            // Setup the consumer and receive the message.
+            consumerInfo = createConsumerInfo(sessionInfo, dest);
+            connection.send(consumerInfo);
+
+            for (int i = 0; i < 4; i++) {
+                m = receiveMessage(connection);
+                assertNotNull(m);
+            }
+
+            // one ack with last received, mimic a beforeEnd synchronization
+            MessageAck ack = createAck(consumerInfo, m, 4, 
MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
+        }
+
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // restart the broker.
+        restartBroker();
+
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connection.send(connectionInfo);
+
+        // validate recovery
+        TransactionInfo recoverInfo = new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER);
+        DataArrayResponse dataArrayResponse = 
(DataArrayResponse)connection.request(recoverInfo);
+
+        assertEquals("there is a prepared tx", 1, 
dataArrayResponse.getData().length);
+        assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+        
+        // no redelivery, exactly once semantics unless there is rollback
+        m = receiveMessage(connection);
+        assertNull(m);
+        assertNoMessagesLeft(connection);
+
+        // validate destination depth via jmx
+        DestinationViewMBean destinationView = 
getProxyToDestination(destinationList(destination)[0]);
+        assertEquals("enqueue count does not see prepared acks", 4, 
destinationView.getQueueSize());
+        assertEquals("enqueue count does not see prepared acks", 0, 
destinationView.getDequeueCount());
+
+        connection.request(createCommitTransaction2Phase(connectionInfo, 
txid));
+
+        // validate recovery complete
+        dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+        assertEquals("there are no prepared tx", 0, 
dataArrayResponse.getData().length);
+
+        assertEquals("enqueue count does not see commited acks", 0, 
destinationView.getQueueSize());
+        assertEquals("enqueue count does not see commited acks", 4, 
destinationView.getDequeueCount());
+
+    }
+
+    public void initCombosForTestTopicPersistentPreparedAcksNotLostOnRestart() 
{
+        addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, 
Boolean.TRUE});
+    }
+
+    public void testTopicPersistentPreparedAcksNotLostOnRestart() throws 
Exception {
+        ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // setup durable subs
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        final int numMessages = 4;
+        for (int i = 0; i < numMessages; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        final int messageCount = expectedMessageCount(numMessages, 
destination);
+        Message m = null;
+        for (int i = 0; i < messageCount; i++) {
+            m = receiveMessage(connection);
+            assertNotNull("unexpected null on: " + i, m);
+        }
+
+        // one ack with last received, mimic a beforeEnd synchronization
+        MessageAck ack = createAck(consumerInfo, m, messageCount, 
MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // restart the broker.
+        restartBroker();
+
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        connection.send(connectionInfo);
+
+        // validate recovery
+        TransactionInfo recoverInfo = new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER);
+        DataArrayResponse dataArrayResponse = 
(DataArrayResponse)connection.request(recoverInfo);
+
+        assertEquals("there is a prepared tx", 1, 
dataArrayResponse.getData().length);
+        assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        // no redelivery, exactly once semantics unless there is rollback
+        m = receiveMessage(connection);
+        assertNull(m);
+        assertNoMessagesLeft(connection);
+
+        connection.request(createCommitTransaction2Phase(connectionInfo, 
txid));
+
+        // validate recovery complete
+        dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+        assertEquals("there are no prepared tx", 0, 
dataArrayResponse.getData().length);
+    }
+
+    public void 
testQueuePersistentPreparedAcksAvailableAfterRestartAndRollback() throws 
Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        int numMessages = 4;
+        for (int i = 0; i < numMessages; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        ConsumerInfo consumerInfo;
+        Message message = null;
+        for (ActiveMQDestination dest : destinationList(destination)) {
+            // Setup the consumer and receive the message.
+            consumerInfo = createConsumerInfo(sessionInfo, dest);
+            connection.send(consumerInfo);
+
+            for (int i = 0; i < numMessages; i++) {
+                message = receiveMessage(connection);
+                assertNotNull(message);
+            }
+
+            // one ack with last received, mimic a beforeEnd synchronization
+            MessageAck ack = createAck(consumerInfo, message, numMessages, 
MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
+        }
+
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // restart the broker.
+        restartBroker();
+
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connection.send(connectionInfo);
+
+        // validate recovery
+        TransactionInfo recoverInfo = new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER);
+        DataArrayResponse dataArrayResponse = 
(DataArrayResponse)connection.request(recoverInfo);
+
+        assertEquals("there is a prepared tx", 1, 
dataArrayResponse.getData().length);
+        assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        connection.send(consumerInfo);
+
+        // no redelivery, exactly once semantics while prepared
+        message = receiveMessage(connection);
+        assertNull(message);
+        assertNoMessagesLeft(connection);
+
+        // rollback so we get redelivery
+        connection.request(createRollbackTransaction(connectionInfo, txid));
+
+        LOG.info("new tx for redelivery");
+        txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        for (ActiveMQDestination dest : destinationList(destination)) {
+            // Setup the consumer and receive the message.
+            consumerInfo = createConsumerInfo(sessionInfo, dest);
+            connection.send(consumerInfo);
+
+            for (int i = 0; i < numMessages; i++) {
+                message = receiveMessage(connection);
+                assertNotNull("unexpected null on:" + i, message);
+            }
+            MessageAck ack = createAck(consumerInfo, message, numMessages, 
MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
+        }
+
+        // Commit
+        connection.request(createCommitTransaction1Phase(connectionInfo, 
txid));
+
+        // validate recovery complete
+        dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+        assertEquals("there are no prepared tx", 0, 
dataArrayResponse.getData().length);
+    }
+
+    public void 
testQueuePersistentPreparedAcksAvailableAfterRollbackPrefetchOne() throws 
Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        int numMessages = 1;
+        for (int i = 0; i < numMessages; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        final int messageCount = expectedMessageCount(numMessages, 
destination);
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        // use consumer per destination for the composite dest case
+        // bc the same composite dest is used for sending so there
+        // will be duplicate message ids in the mix which a single
+        // consumer (PrefetchSubscription) cannot handle in a tx
+        // atm. The matching is based on messageId rather than messageId
+        // and destination
+        Set<ConsumerInfo> consumerInfos = new HashSet<ConsumerInfo>();
+        for (ActiveMQDestination dest : destinationList(destination)) {
+            ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest);
+            consumerInfo.setPrefetchSize(numMessages);
+            consumerInfos.add(consumerInfo);
+        }
+
+        for (ConsumerInfo info : consumerInfos) {
+            connection.send(info);
+        }
+
+        Message message = null;
+        for (ConsumerInfo info : consumerInfos) {
+            for (int i = 0; i < numMessages; i++) {
+               message = receiveMessage(connection);
+               assertNotNull(message);
+               connection.send(createAck(info, message, 1, 
MessageAck.DELIVERED_ACK_TYPE));
+            }
+            MessageAck ack = createAck(info, message, numMessages, 
MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.send(ack);
+        }
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // reconnect
+        connection.send(connectionInfo.createRemoveCommand());
+        connection = createConnection();
+        connection.send(connectionInfo);
+
+        // validate recovery
+        TransactionInfo recoverInfo = new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER);
+        DataArrayResponse dataArrayResponse = (DataArrayResponse) 
connection.request(recoverInfo);
+
+        assertEquals("there is a prepared tx", 1, 
dataArrayResponse.getData().length);
+        assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+        connection.send(sessionInfo);
+
+        for (ConsumerInfo info : consumerInfos) {
+            connection.send(info);
+        }
+
+        // no redelivery, exactly once semantics while prepared
+        message = receiveMessage(connection);
+        assertNull(message);
+        assertNoMessagesLeft(connection);
+
+        // rollback so we get redelivery
+        connection.request(createRollbackTransaction(connectionInfo, txid));
+
+        LOG.info("new tx for redelivery");
+        txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        for (ConsumerInfo info : consumerInfos) {
+            for (int i = 0; i < numMessages; i++) {
+                message = receiveMessage(connection);
+                assertNotNull("unexpected null on:" + i, message);
+                MessageAck ack = createAck(info, message, 1, 
MessageAck.STANDARD_ACK_TYPE);
+                ack.setTransactionId(txid);
+                connection.send(ack);
+            }
+        }
+
+        // Commit
+        connection.request(createCommitTransaction1Phase(connectionInfo, 
txid));
+
+        // validate recovery complete
+        dataArrayResponse = (DataArrayResponse) 
connection.request(recoverInfo);
+        assertEquals("there are no prepared tx", 0, 
dataArrayResponse.getData().length);
+    }
+
+    public void 
initCombosForTestTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() {
+        addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, 
Boolean.TRUE});
+    }
+
+    public void 
testTopicPersistentPreparedAcksAvailableAfterRestartAndRollback() throws 
Exception {
+
+        ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // setup durable subs
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        int numMessages = 4;
+        for (int i = 0; i < numMessages; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        Message message = null;
+        for (int i = 0; i < numMessages; i++) {
+            message = receiveMessage(connection);
+            assertNotNull(message);
+        }
+
+        // one ack with last received, mimic a beforeEnd synchronization
+        MessageAck ack = createAck(consumerInfo, message, numMessages, 
MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // restart the broker.
+        restartBroker();
+
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        connection.send(connectionInfo);
+
+        // validate recovery
+        TransactionInfo recoverInfo = new 
TransactionInfo(connectionInfo.getConnectionId(), null, 
TransactionInfo.RECOVER);
+        DataArrayResponse dataArrayResponse = 
(DataArrayResponse)connection.request(recoverInfo);
+
+        assertEquals("there is a prepared tx", 1, 
dataArrayResponse.getData().length);
+        assertEquals("it matches", txid, dataArrayResponse.getData()[0]);
+
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(sessionInfo);
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        // no redelivery, exactly once semantics while prepared
+        message = receiveMessage(connection);
+        assertNull(message);
+        assertNoMessagesLeft(connection);
+
+        // rollback so we get redelivery
+        connection.request(createRollbackTransaction(connectionInfo, txid));
+
+        LOG.info("new tx for redelivery");
+        txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        for (int i = 0; i < numMessages; i++) {
+            message = receiveMessage(connection);
+            assertNotNull("unexpected null on:" + i, message);
+        }
+        ack = createAck(consumerInfo, message, numMessages, 
MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        // Commit
+        connection.request(createCommitTransaction1Phase(connectionInfo, 
txid));
+
+        // validate recovery complete
+        dataArrayResponse = (DataArrayResponse)connection.request(recoverInfo);
+        assertEquals("there are no prepared tx", 0, 
dataArrayResponse.getData().length);
+    }
+
+    public void 
initCombosForTestTopicPersistentPreparedAcksAvailableAfterRollback() {
+        addCombinationValues("prioritySupport", new Boolean[]{Boolean.FALSE, 
Boolean.TRUE});
+    }
+
+    public void testTopicPersistentPreparedAcksAvailableAfterRollback() throws 
Exception {
+
+        ActiveMQDestination destination = new ActiveMQTopic("TryTopic");
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // setup durable subs
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        int numMessages = 4;
+        for (int i = 0; i < numMessages; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        Message message = null;
+        for (int i = 0; i < numMessages; i++) {
+            message = receiveMessage(connection);
+            assertNotNull(message);
+        }
+
+        // one ack with last received, mimic a beforeEnd synchronization
+        MessageAck ack = createAck(consumerInfo, message, numMessages, 
MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        connection.request(createPrepareTransaction(connectionInfo, txid));
+
+        // rollback so we get redelivery
+        connection.request(createRollbackTransaction(connectionInfo, txid));
+
+        LOG.info("new consumer/tx for redelivery");
+        connection.request(closeConnectionInfo(connectionInfo));
+
+        connectionInfo = createConnectionInfo();
+        connectionInfo.setClientId("durable");
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+
+        // setup durable subs
+        consumerInfo = createConsumerInfo(sessionInfo, destination);
+        consumerInfo.setSubscriptionName("durable");
+        connection.send(consumerInfo);
+
+        txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        for (int i = 0; i < numMessages; i++) {
+            message = receiveMessage(connection);
+            assertNotNull("unexpected null on:" + i, message);
+        }
+        ack = createAck(consumerInfo, message, numMessages, 
MessageAck.STANDARD_ACK_TYPE);
+        ack.setTransactionId(txid);
+        connection.send(ack);
+
+        // Commit
+        connection.request(createCommitTransaction1Phase(connectionInfo, 
txid));
+    }
+
+    private ActiveMQDestination[] destinationList(ActiveMQDestination dest) {
+        return dest.isComposite() ? dest.getCompositeDestinations() : new 
ActiveMQDestination[]{dest};
+    }
+
+    private int expectedMessageCount(int i, ActiveMQDestination destination) {
+        return i * (destination.isComposite() ? 
destination.getCompositeDestinations().length : 1);
+    }
+
+    public void testQueuePersistentUncommittedAcksLostOnRestart() throws 
Exception {
+
+        ActiveMQDestination destination = createDestination();
+
+        // Setup the producer and send the message.
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        for (int i = 0; i < 4; i++) {
+            Message message = createMessage(producerInfo, destination);
+            message.setPersistent(true);
+            connection.send(message);
+        }
+
+        // Begin the transaction.
+        XATransactionId txid = createXATransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        Message message = null;
+        for (ActiveMQDestination dest : destinationList(destination)) {
+            // Setup the consumer and receive the message.
+            ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest);
+            connection.send(consumerInfo);
+
+            for (int i = 0; i < 4; i++) {
+                message = receiveMessage(connection);
+                assertNotNull(message);
+            }
+            MessageAck ack = createAck(consumerInfo, message, 4, 
MessageAck.STANDARD_ACK_TYPE);
+            ack.setTransactionId(txid);
+            connection.request(ack);
+        }
+
+        // Don't commit
+
+        // restart the broker.
+        restartBroker();
+
+        // Setup the consumer and receive the message.
+        connection = createConnection();
+        connectionInfo = createConnectionInfo();
+        sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+
+        for (ActiveMQDestination dest : destinationList(destination)) {
+            // Setup the consumer and receive the message.
+            ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, dest);
+            connection.send(consumerInfo);
+
+            for (int i = 0; i < 4; i++) {
+                message = receiveMessage(connection);
+                assertNotNull(message);
+            }
+        }
+
+        assertNoMessagesLeft(connection);
+    }
+
+    @Override
+    protected PolicyEntry getDefaultPolicy() {
+        PolicyEntry policyEntry = super.getDefaultPolicy();
+        policyEntry.setPrioritizedMessages(prioritySupport);
+        return policyEntry;
+    }
+
+    public static Test suite() {
+        return suite(XARecoveryBrokerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    protected ActiveMQDestination createDestination() {
+        return new ActiveMQQueue(getClass().getName() + "." + getName());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
new file mode 100644
index 0000000..c65dc53
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.advisory;
+
+import junit.framework.Test;
+
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerTestSupport;
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class AdvisoryBrokerTest extends BrokerTestSupport {
+     
+    public void testConnectionAdvisories() throws Exception {
+        
+        ActiveMQDestination destination = 
AdvisorySupport.getConnectionAdvisoryTopic();
+        
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
+        consumerInfo1.setPrefetchSize(100);
+        
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(consumerInfo1);
+
+        // We should get an advisory of our own connection.
+        Message m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        
assertEquals(((ConnectionInfo)m1.getDataStructure()).getConnectionId(), 
connectionInfo1.getConnectionId());
+
+        // Setup a second connection 
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        connection2.send(connectionInfo2);
+        
+        // We should get an advisory of the second connection.
+        m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        
assertEquals(((ConnectionInfo)m1.getDataStructure()).getConnectionId(), 
connectionInfo2.getConnectionId());
+
+        // Close the second connection.
+        connection2.send(closeConnectionInfo(connectionInfo2));
+        connection2.stop();
+
+        // We should get an advisory of the second connection closing
+        m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        RemoveInfo r = (RemoveInfo) m1.getDataStructure();
+        assertEquals(r.getObjectId(), connectionInfo2.getConnectionId());
+        
+        assertNoMessagesLeft(connection1);
+    }
+
+    public void testConsumerAdvisories() throws Exception {
+
+        ActiveMQDestination queue = new ActiveMQQueue("test");
+        ActiveMQDestination destination = 
AdvisorySupport.getConsumerAdvisoryTopic(queue);
+        
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
+        consumerInfo1.setPrefetchSize(100);
+        
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(consumerInfo1);
+
+        // We should not see and advisory for the advisory consumer.
+        assertNoMessagesLeft(connection1);
+
+        // Setup a second consumer.
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, queue);
+        consumerInfo1.setPrefetchSize(100);
+        
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.send(consumerInfo2);
+        
+        // We should get an advisory of the new consumer.
+        Message m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        assertEquals(((ConsumerInfo)m1.getDataStructure()).getConsumerId(), 
consumerInfo2.getConsumerId());
+
+        // Close the second connection.
+        connection2.request(closeConnectionInfo(connectionInfo2));
+        connection2.stop();
+
+        // We should get an advisory of the consumer closing
+        m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        RemoveInfo r = (RemoveInfo) m1.getDataStructure();
+        assertEquals(r.getObjectId(), consumerInfo2.getConsumerId());
+        
+        assertNoMessagesLeft(connection2);
+    }
+
+    public void testConsumerAdvisoriesReplayed() throws Exception {
+
+        ActiveMQDestination queue = new ActiveMQQueue("test");
+        ActiveMQDestination destination = 
AdvisorySupport.getConsumerAdvisoryTopic(queue);
+        
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+
+        // Setup a second consumer.
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, queue);
+        consumerInfo2.setPrefetchSize(100);        
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.send(consumerInfo2);
+        
+        // We should get an advisory of the previous consumer.        
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
+        consumerInfo1.setPrefetchSize(100);
+        connection1.send(consumerInfo1);
+
+        Message m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        assertEquals(((ConsumerInfo)m1.getDataStructure()).getConsumerId(), 
consumerInfo2.getConsumerId());
+
+        // Close the second connection.
+        connection2.request(closeConnectionInfo(connectionInfo2));
+        connection2.stop();
+
+        // We should get an advisory of the consumer closing
+        m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        RemoveInfo r = (RemoveInfo) m1.getDataStructure();
+        assertEquals(r.getObjectId(), consumerInfo2.getConsumerId());
+        
+        assertNoMessagesLeft(connection2);
+    }
+
+
+    public void testProducerAdvisories() throws Exception {
+
+        ActiveMQDestination queue = new ActiveMQQueue("test");
+        ActiveMQDestination destination = 
AdvisorySupport.getProducerAdvisoryTopic(queue);
+        
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
+        consumerInfo1.setPrefetchSize(100);
+        
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(consumerInfo1);
+
+        assertNoMessagesLeft(connection1);
+
+        // Setup a producer.
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        producerInfo2.setDestination(queue);
+        
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.send(producerInfo2);
+        
+        // We should get an advisory of the new produver.
+        Message m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), 
producerInfo2.getProducerId());
+
+        // Close the second connection.
+        connection2.request(closeConnectionInfo(connectionInfo2));
+        connection2.stop();
+
+        // We should get an advisory of the producer closing
+        m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        RemoveInfo r = (RemoveInfo) m1.getDataStructure();
+        assertEquals(r.getObjectId(), producerInfo2.getProducerId());
+        
+        assertNoMessagesLeft(connection2);
+    }
+    
+    public void testProducerAdvisoriesReplayed() throws Exception {
+
+        ActiveMQDestination queue = new ActiveMQQueue("test");
+        ActiveMQDestination destination = 
AdvisorySupport.getProducerAdvisoryTopic(queue);
+        
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+
+        // Setup a producer.
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        producerInfo2.setDestination(queue);
+        
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.send(producerInfo2);
+        
+        // Create the advisory consumer.. it should see the previous producer  
      
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
+        consumerInfo1.setPrefetchSize(100);
+        connection1.send(consumerInfo1);
+
+        Message m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), 
producerInfo2.getProducerId());
+
+        // Close the second connection.
+        connection2.request(closeConnectionInfo(connectionInfo2));
+        connection2.stop();
+
+        // We should get an advisory of the producer closing
+        m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        RemoveInfo r = (RemoveInfo) m1.getDataStructure();
+        assertEquals(r.getObjectId(), producerInfo2.getProducerId());
+        
+        assertNoMessagesLeft(connection2);
+    }
+
+    public void testProducerAdvisoriesReplayedOnlyTargetNewConsumer() throws 
Exception {
+
+        ActiveMQDestination queue = new ActiveMQQueue("test");
+        ActiveMQDestination destination = 
AdvisorySupport.getProducerAdvisoryTopic(queue);
+        
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        // Create the first consumer..         
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
+        consumerInfo1.setPrefetchSize(100);
+        connection1.send(consumerInfo1);
+
+        // Setup a producer.
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        producerInfo2.setDestination(queue);        
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.send(producerInfo2);
+        
+        Message m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), 
producerInfo2.getProducerId());
+        
+        // Create the 2nd consumer..         
+        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, 
destination);
+        consumerInfo2.setPrefetchSize(100);
+        connection2.send(consumerInfo2);
+
+        // The second consumer should se a replay
+        m1 = receiveMessage(connection2);
+        assertNotNull(m1);
+        assertNotNull(m1.getDataStructure());
+        assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), 
producerInfo2.getProducerId());
+
+        // But the first consumer should not see the replay.
+        assertNoMessagesLeft(connection1);
+    }
+
+    public static Test suite() {
+        return suite(AdvisoryBrokerTest.class);
+    }
+    
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuplexNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuplexNetworkBridgeTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuplexNetworkBridgeTest.java
new file mode 100644
index 0000000..e791fbe
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryDuplexNetworkBridgeTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.advisory;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+
+import java.net.URI;
+
+public class AdvisoryDuplexNetworkBridgeTest extends AdvisoryNetworkBridgeTest 
{
+
+    @Override
+    public void createBroker1() throws Exception {
+        broker1 = new BrokerService();
+        broker1.setBrokerName("broker1");
+        broker1.addConnector("tcp://localhost:61617");
+        broker1.setUseJmx(false);
+        broker1.setPersistent(false);
+        broker1.start();
+        broker1.waitUntilStarted();
+    }
+
+    @Override
+    public void createBroker2() throws Exception {
+        broker2 = BrokerFactory.createBroker(new 
URI("xbean:org/apache/activemq/network/duplexLocalBroker.xml"));
+        broker2.start();
+        broker2.waitUntilStarted();
+    }
+
+    public void assertCreatedByDuplex(boolean createdByDuplex) {
+        assertTrue(createdByDuplex);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryJmxTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryJmxTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryJmxTest.java
new file mode 100644
index 0000000..d452bed
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryJmxTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.advisory;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.DestinationInfo;
+
+import javax.jms.*;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+public class AdvisoryJmxTest extends EmbeddedBrokerTestSupport {
+
+        protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(isPersistent());
+        answer.addConnector(bindAddress);
+        ManagementContext context = new ManagementContext();
+        context.setConnectorPort(1199);
+        answer.setManagementContext(context);
+        return answer;
+    }
+
+    public void testCreateDeleteDestinations() throws Exception {
+        JMXServiceURL url = new 
JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1199/jmxrmi");
+        JMXConnector connector = JMXConnectorFactory.connect(url, null);
+        connector.connect();
+        MBeanServerConnection connection = 
connector.getMBeanServerConnection();
+        ObjectName name = new 
ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
+        BrokerViewMBean brokerMbean = (BrokerViewMBean) 
MBeanServerInvocationHandler.newProxyInstance(connection, name, 
BrokerViewMBean.class, true);
+        Connection conn = createConnection();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = 
sess.createConsumer(sess.createTopic("ActiveMQ.Advisory.Queue"));
+        conn.start();
+        Destination dest = sess.createQueue("test");
+
+        brokerMbean.addQueue("test");
+
+        ActiveMQMessage msg = (ActiveMQMessage)consumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getDataStructure() instanceof DestinationInfo);
+        
assertEquals(((DestinationInfo)msg.getDataStructure()).getDestination(), dest);
+        
assertEquals(((DestinationInfo)msg.getDataStructure()).getOperationType(), 0);
+
+        brokerMbean.removeQueue("test");
+
+        msg = (ActiveMQMessage)consumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getDataStructure() instanceof DestinationInfo);
+        
assertEquals(((DestinationInfo)msg.getDataStructure()).getDestination(), dest);
+        
assertEquals(((DestinationInfo)msg.getDataStructure()).getOperationType(), 1);
+
+
+        brokerMbean.addQueue("test");
+        msg = (ActiveMQMessage)consumer.receive(1000);
+        assertNotNull(msg);
+        assertTrue(msg.getDataStructure() instanceof DestinationInfo);
+        
assertEquals(((DestinationInfo)msg.getDataStructure()).getDestination(), dest);
+        
assertEquals(((DestinationInfo)msg.getDataStructure()).getOperationType(), 0);
+        
assertEquals(((DestinationInfo)msg.getDataStructure()).getOperationType(), 0);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
new file mode 100644
index 0000000..da9cc6d
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryNetworkBridgeTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.advisory;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.BrokerInfo;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.net.URI;
+
+public class AdvisoryNetworkBridgeTest extends TestCase {
+
+    BrokerService broker1;
+    BrokerService broker2;
+
+
+    public void testAdvisory() throws Exception {
+        createBroker1();
+
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://broker1");
+        Connection conn = factory.createConnection();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        conn.start();
+        MessageConsumer consumer = 
sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());
+        
+        Thread.sleep(1000);
+
+        createBroker2();
+        
+        ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000);
+        assertNotNull(advisory);
+        assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
+        assertTrue(advisory.getBooleanProperty("started"));
+        assertCreatedByDuplex(advisory.getBooleanProperty("createdByDuplex"));
+        
+        broker2.stop();
+        broker2.waitUntilStopped();
+
+        advisory = (ActiveMQMessage)consumer.receive(2000);
+        assertNotNull(advisory);
+        assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
+        assertFalse(advisory.getBooleanProperty("started"));
+
+        conn.close();
+    }
+
+    public void testAddConsumerLater() throws Exception {
+        createBroker1();
+
+        createBroker2();
+
+        Thread.sleep(1000);
+
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://broker1");
+        Connection conn = factory.createConnection();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        conn.start();
+        MessageConsumer consumer = 
sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());
+
+        ActiveMQMessage advisory = (ActiveMQMessage)consumer.receive(2000);
+        assertNotNull(advisory);
+        assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
+        assertTrue(advisory.getBooleanProperty("started"));
+        assertCreatedByDuplex(advisory.getBooleanProperty("createdByDuplex"));
+
+        broker2.stop();
+        broker2.waitUntilStopped();
+
+        advisory = (ActiveMQMessage)consumer.receive(2000);
+        assertNotNull(advisory);
+        assertTrue(advisory.getDataStructure() instanceof BrokerInfo);
+        assertFalse(advisory.getBooleanProperty("started"));
+
+        consumer = 
sess.createConsumer(AdvisorySupport.getNetworkBridgeAdvisoryTopic());
+        advisory = (ActiveMQMessage)consumer.receive(1000);
+        assertNull(advisory);
+
+        conn.close();
+
+    }
+
+    public void assertCreatedByDuplex(boolean createdByDuplex) {
+        assertFalse(createdByDuplex);
+    }
+
+    public void createBroker1() throws Exception {
+        broker1 = BrokerFactory.createBroker(new 
URI("xbean:org/apache/activemq/network/reconnect-broker1.xml"));
+        broker1.start();
+        broker1.waitUntilStarted();
+    }
+
+    public void createBroker2() throws Exception {
+        broker2 = BrokerFactory.createBroker(new 
URI("xbean:org/apache/activemq/network/reconnect-broker2.xml"));
+        broker2.start();
+        broker2.waitUntilStarted();
+    }
+
+
+    @Override
+    protected void tearDown() throws Exception {
+       broker1.stop();
+       broker1.waitUntilStopped();
+
+       broker2.stop();
+       broker2.waitUntilStopped();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/destinations-on-start.xml
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/destinations-on-start.xml
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/destinations-on-start.xml
new file mode 100644
index 0000000..f0ee482
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/destinations-on-start.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd";>
+
+  <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" 
/>
+
+  <broker xmlns="http://activemq.apache.org/schema/core";>
+    <destinations>
+      <queue physicalName="FOO.BAR" />
+      <topic physicalName="SOME.TOPIC" />
+    </destinations>
+
+  </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/exclusive-consumer-startup-destination.xml
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/exclusive-consumer-startup-destination.xml
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/exclusive-consumer-startup-destination.xml
new file mode 100644
index 0000000..5c10b7d
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/exclusive-consumer-startup-destination.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd";>
+
+  <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" 
/>
+
+  <broker xmlns="http://activemq.apache.org/schema/core";>
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry queue="TEST.>" allConsumersExclusiveByDefault="true"/>
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+    <destinations>
+      <queue physicalName="TEST.QUEUE1"/>
+      <queue physicalName="TEST.QUEUE2"/>
+      <queue physicalName="TEST.QUEUE3"/>
+    </destinations>
+  </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->

Reply via email to