http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
new file mode 100644
index 0000000..5fa0620
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTestSupport.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.region.RegionBroker;
+import 
org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.usage.SystemUsage;
+
+public class BrokerTestSupport extends CombinationTestSupport {
+
+    /**
+     * Setting this to false makes the test run faster but they may be less
+     * accurate.
+     */
+    public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = 
System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true");
+
+    protected RegionBroker regionBroker;
+    public BrokerService broker;
+    protected long idGenerator;
+    protected int msgIdGenerator;
+    protected int txGenerator;
+    protected int tempDestGenerator;
+    public PersistenceAdapter persistenceAdapter;
+
+    protected String queueName = "TEST";
+
+    protected int maxWait = 10000;
+
+    protected SystemUsage memoryManager;
+    protected PolicyMap policyMap = new PolicyMap();
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = createBroker();
+        policyMap.setDefaultEntry(getDefaultPolicy());
+        broker.setDestinationPolicy(policyMap);
+        broker.start();
+    }
+
+    protected PolicyEntry getDefaultPolicy() {
+        PolicyEntry policy = new PolicyEntry();
+        policy.setDispatchPolicy(new RoundRobinDispatchPolicy());
+        policy.setSubscriptionRecoveryPolicy(new 
FixedCountSubscriptionRecoveryPolicy());
+        return policy;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = BrokerFactory.createBroker(new 
URI("broker:()/localhost?persistent=false"));
+        return broker;
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = null;
+        regionBroker = null;
+        persistenceAdapter = null;
+        memoryManager = null;
+        super.tearDown();
+    }
+
+    protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, 
ActiveMQDestination destination) throws Exception {
+        ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+        info.setBrowser(false);
+        info.setDestination(destination);
+        info.setPrefetchSize(1000);
+        info.setDispatchAsync(false);
+        return info;
+    }
+
+    protected RemoveInfo closeConsumerInfo(ConsumerInfo consumerInfo) {
+        return consumerInfo.createRemoveCommand();
+    }
+
+    protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws 
Exception {
+        ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+        return info;
+    }
+
+    protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) 
throws Exception {
+        SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+        return info;
+    }
+
+    protected ConnectionInfo createConnectionInfo() throws Exception {
+        ConnectionInfo info = new ConnectionInfo();
+        info.setConnectionId(new ConnectionId("connection:" + 
(++idGenerator)));
+        info.setClientId(info.getConnectionId().getValue());
+        return info;
+    }
+
+    protected Message createMessage(ProducerInfo producerInfo, 
ActiveMQDestination destination) {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+        message.setDestination(destination);
+        message.setPersistent(false);
+        try {
+            message.setText("Test Message Payload.");
+        } catch (MessageNotWriteableException e) {
+        }
+        return message;
+    }
+
+    protected MessageAck createAck(ConsumerInfo consumerInfo, Message msg, int 
count, byte ackType) {
+        MessageAck ack = new MessageAck();
+        ack.setAckType(ackType);
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        ack.setDestination(msg.getDestination());
+        ack.setLastMessageId(msg.getMessageId());
+        ack.setMessageCount(count);
+        return ack;
+    }
+
+    protected void gc() {
+        regionBroker.gc();
+    }
+
+    protected void profilerPause(String prompt) throws IOException {
+        if (System.getProperty("profiler") != null) {
+            System.out.println();
+            System.out.println(prompt + "> Press enter to continue: ");
+            while (System.in.read() != '\n') {
+            }
+            System.out.println(prompt + "> Done.");
+        }
+    }
+
+    protected RemoveInfo closeConnectionInfo(ConnectionInfo info) {
+        return info.createRemoveCommand();
+    }
+
+    protected RemoveInfo closeSessionInfo(SessionInfo info) {
+        return info.createRemoveCommand();
+    }
+
+    protected RemoveInfo closeProducerInfo(ProducerInfo info) {
+        return info.createRemoveCommand();
+    }
+
+    protected Message createMessage(ProducerInfo producerInfo, 
ActiveMQDestination destination, int deliveryMode) {
+        Message message = createMessage(producerInfo, destination);
+        message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
+        return message;
+    }
+
+    protected LocalTransactionId createLocalTransaction(SessionInfo info) {
+        LocalTransactionId id = new 
LocalTransactionId(info.getSessionId().getParentId(), ++txGenerator);
+        return id;
+    }
+
+    protected XATransactionId createXATransaction(SessionInfo info) throws 
IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream os = new DataOutputStream(baos);
+        os.writeLong(++txGenerator);
+        os.close();
+        byte[] bs = baos.toByteArray();
+
+        XATransactionId xid = new XATransactionId();
+        xid.setBranchQualifier(bs);
+        xid.setGlobalTransactionId(bs);
+        xid.setFormatId(55);
+        return xid;
+    }
+
+    protected TransactionInfo createBeginTransaction(ConnectionInfo 
connectionInfo, TransactionId txid) {
+        TransactionInfo info = new 
TransactionInfo(connectionInfo.getConnectionId(), txid, TransactionInfo.BEGIN);
+        return info;
+    }
+
+    protected TransactionInfo createPrepareTransaction(ConnectionInfo 
connectionInfo, TransactionId txid) {
+        TransactionInfo info = new 
TransactionInfo(connectionInfo.getConnectionId(), txid, 
TransactionInfo.PREPARE);
+        return info;
+    }
+
+    protected TransactionInfo createCommitTransaction1Phase(ConnectionInfo 
connectionInfo, TransactionId txid) {
+        TransactionInfo info = new 
TransactionInfo(connectionInfo.getConnectionId(), txid, 
TransactionInfo.COMMIT_ONE_PHASE);
+        return info;
+    }
+
+    protected TransactionInfo createCommitTransaction2Phase(ConnectionInfo 
connectionInfo, TransactionId txid) {
+        TransactionInfo info = new 
TransactionInfo(connectionInfo.getConnectionId(), txid, 
TransactionInfo.COMMIT_TWO_PHASE);
+        return info;
+    }
+
+    protected TransactionInfo createRollbackTransaction(ConnectionInfo 
connectionInfo, TransactionId txid) {
+        TransactionInfo info = new 
TransactionInfo(connectionInfo.getConnectionId(), txid, 
TransactionInfo.ROLLBACK);
+        return info;
+    }
+
+    protected int countMessagesInQueue(StubConnection connection, 
ConnectionInfo connectionInfo, ActiveMQDestination destination) throws 
Exception {
+
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        connection.send(sessionInfo);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        consumerInfo.setPrefetchSize(1);
+        consumerInfo.setBrowser(true);
+        connection.send(consumerInfo);
+
+        ArrayList<Object> skipped = new ArrayList<Object>();
+
+        // Now get the messages.
+        Object m = connection.getDispatchQueue().poll(maxWait, 
TimeUnit.MILLISECONDS);
+        int i = 0;
+        while (m != null) {
+            if (m instanceof MessageDispatch && 
((MessageDispatch)m).getConsumerId().equals(consumerInfo.getConsumerId())) {
+                MessageDispatch md = (MessageDispatch)m;
+                if (md.getMessage() != null) {
+                    i++;
+                    connection.send(createAck(consumerInfo, md.getMessage(), 
1, MessageAck.STANDARD_ACK_TYPE));
+                } else {
+                    break;
+                }
+            } else {
+                skipped.add(m);
+            }
+            m = connection.getDispatchQueue().poll(maxWait, 
TimeUnit.MILLISECONDS);
+        }
+
+        for (Iterator<Object> iter = skipped.iterator(); iter.hasNext();) {
+            connection.getDispatchQueue().put(iter.next());
+        }
+
+        connection.send(closeSessionInfo(sessionInfo));
+        return i;
+
+    }
+
+    protected DestinationInfo createTempDestinationInfo(ConnectionInfo 
connectionInfo, byte destinationType) {
+        DestinationInfo info = new DestinationInfo();
+        info.setConnectionId(connectionInfo.getConnectionId());
+        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
+        
info.setDestination(ActiveMQDestination.createDestination(info.getConnectionId()
 + ":" + (++tempDestGenerator), destinationType));
+        return info;
+    }
+
+    protected ActiveMQDestination createDestinationInfo(StubConnection 
connection, ConnectionInfo connectionInfo1, byte destinationType) throws 
Exception {
+        if ((destinationType & ActiveMQDestination.TEMP_MASK) != 0) {
+            DestinationInfo info = createTempDestinationInfo(connectionInfo1, 
destinationType);
+            connection.send(info);
+            return info.getDestination();
+        } else {
+            return ActiveMQDestination.createDestination(queueName, 
destinationType);
+        }
+    }
+
+    protected DestinationInfo closeDestinationInfo(DestinationInfo info) {
+        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+        info.setTimeout(0);
+        return info;
+    }
+
+    public static void recursiveDelete(File f) {
+        if (f.isDirectory()) {
+            File[] files = f.listFiles();
+            for (int i = 0; i < files.length; i++) {
+                recursiveDelete(files[i]);
+            }
+        }
+        f.delete();
+    }
+
+    protected StubConnection createConnection() throws Exception {
+        return new StubConnection(broker);
+    }
+
+    /**
+     * @param connection
+     * @return
+     * @throws InterruptedException
+     */
+    public Message receiveMessage(StubConnection connection) throws 
InterruptedException {
+        return receiveMessage(connection, maxWait);
+    }
+
+    public Message receiveMessage(StubConnection connection, long timeout) 
throws InterruptedException {
+        while (true) {
+            Object o = connection.getDispatchQueue().poll(timeout, 
TimeUnit.MILLISECONDS);
+
+            if (o == null) {
+                return null;
+            }
+            if (o instanceof MessageDispatch) {
+
+                MessageDispatch dispatch = (MessageDispatch)o;
+                if (dispatch.getMessage() == null) {
+                    return null;
+                }
+                dispatch.setMessage(dispatch.getMessage().copy());
+                
dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
+                return dispatch.getMessage();
+            }
+        }
+    };
+
+    protected void assertNoMessagesLeft(StubConnection connection) throws 
InterruptedException {
+        long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : maxWait;
+        while (true) {
+            Object o = connection.getDispatchQueue().poll(wait, 
TimeUnit.MILLISECONDS);
+            if (o == null) {
+                return;
+            }
+            if (o instanceof MessageDispatch && 
((MessageDispatch)o).getMessage() != null) {
+                fail("Received a message: 
"+((MessageDispatch)o).getMessage().getMessageId());
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ConcurrentConnectSimulationTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ConcurrentConnectSimulationTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ConcurrentConnectSimulationTest.java
new file mode 100644
index 0000000..0c791fd
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ConcurrentConnectSimulationTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import junit.framework.Test;
+
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.SessionId;
+
+public class ConcurrentConnectSimulationTest extends BrokerTestSupport {
+
+    /*
+     * simulate failover and retry of connection before broker has killed 
connection
+     * which appears as a concurrent connect request to the broker
+     * see: https://issues.apache.org/activemq/browse/AMQ-2241
+     */
+    public void testConcurrentConnection() throws Exception {
+
+        StubConnection connection1 = createConnection();
+        StubConnection connection2 = createConnection();
+        
+        // reuse same connection info
+        ConnectionInfo connectionInfo = createConnectionInfo();       
+        connection1.request(connectionInfo);
+        connection2.request(connectionInfo);
+        
+        // second one should win out, verify using consumer on default session 
(watchAdvisories)
+        ConsumerId consumerId = new ConsumerId(new 
SessionId(connectionInfo.getConnectionId(), -1), 1);
+        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
+        
consumerInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
+
+        connection2.request(consumerInfo);
+    }
+    
+    public static Test suite() {
+        return suite(ConcurrentConnectSimulationTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/CreateDestinationsOnStartupViaXBeanTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/CreateDestinationsOnStartupViaXBeanTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/CreateDestinationsOnStartupViaXBeanTest.java
new file mode 100644
index 0000000..70fda7c
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/CreateDestinationsOnStartupViaXBeanTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.net.URI;
+import java.util.Set;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+
+/**
+ * 
+ * 
+ */
+public class CreateDestinationsOnStartupViaXBeanTest extends 
EmbeddedBrokerTestSupport {
+
+    public void testNewDestinationsAreCreatedOnStartup() throws Exception {
+        assertQueueCreated("FOO.BAR", true);
+        assertQueueCreated("FOO.DoesNotExist", false);
+        
+        assertTopicCreated("SOME.TOPIC", true);
+        assertTopicCreated("FOO.DoesNotExist", false);
+    }
+
+    protected void assertQueueCreated(String name, boolean expected) throws 
Exception {
+        assertDestinationCreated(new ActiveMQQueue(name), expected);
+    }
+    
+    protected void assertTopicCreated(String name, boolean expected) throws 
Exception {
+        assertDestinationCreated(new ActiveMQTopic(name), expected);
+    }
+
+    protected void assertDestinationCreated(ActiveMQDestination destination, 
boolean expected) throws Exception {
+        Set answer = broker.getBroker().getDestinations(destination);
+        int size = expected ? 1 : 0;
+        assertEquals("Could not find destination: " + destination + ". Size of 
found destinations: " + answer, size, answer.size());
+    }
+    
+    protected BrokerService createBroker() throws Exception {
+        XBeanBrokerFactory factory = new XBeanBrokerFactory();
+        BrokerService answer = factory.createBroker(new 
URI(getBrokerConfigUri()));
+        
+        // lets disable persistence as we are a test
+        answer.setPersistent(false);
+        
+        return answer;
+    }
+
+    protected String getBrokerConfigUri() {
+        return "org/apache/activemq/broker/destinations-on-start.xml";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java
new file mode 100644
index 0000000..c186420
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DedicatedTaskRunnerBrokerTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import junit.framework.Test;
+
+public class DedicatedTaskRunnerBrokerTest extends BrokerTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        broker.setDedicatedTaskRunner(true);
+        return broker;
+    }
+    
+    public static Test suite() {
+        return suite(DedicatedTaskRunnerBrokerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
new file mode 100644
index 0000000..8fd1292
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DoubleSubscriptionTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.DeliveryMode;
+
+import junit.framework.Test;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.network.NetworkTestSupport;
+
+/**
+ * Pretend to be an abusive client that sends multiple identical ConsumerInfo
+ * commands and make sure the broker doesn't stall because of it.
+ */
+
+public class DoubleSubscriptionTest extends NetworkTestSupport {
+
+    public ActiveMQDestination destination;
+    public int deliveryMode;
+
+    private String remoteURI = 
"tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
+
+    public static Test suite() {
+        return suite(DoubleSubscriptionTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    public void initCombosForTestDoubleSubscription() {
+        addCombinationValues("destination", new Object[] {new 
ActiveMQQueue("TEST"), new ActiveMQQueue("TEST")});
+    }
+
+    public void testDoubleSubscription() throws Exception {
+
+        // Start a normal consumer on the remote broker
+        StubConnection connection1 = createRemoteConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.request(consumerInfo1);
+
+        // Start a normal producer on a remote broker
+        StubConnection connection2 = createRemoteConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.request(producerInfo2);
+
+        // Send a message to make sure the basics are working
+        connection2.request(createMessage(producerInfo2, destination, 
DeliveryMode.PERSISTENT));
+
+        Message m1 = receiveMessage(connection1);
+        assertNotNull(m1);
+        assertNoMessagesLeft(connection1);
+
+        connection1.send(createAck(consumerInfo1, m1, 1, 
MessageAck.STANDARD_ACK_TYPE));
+
+        // Send a message to sit on the broker while we mess with it
+        connection2.request(createMessage(producerInfo2, destination, 
DeliveryMode.PERSISTENT));
+
+        // Now we're going to resend the same consumer commands again and see 
if
+        // the broker
+        // can handle it.
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.request(consumerInfo1);
+
+        // After this there should be 2 messages on the broker...
+        connection2.request(createMessage(producerInfo2, destination, 
DeliveryMode.PERSISTENT));
+
+        // ... let's start a fresh consumer...
+        connection1.stop();
+        StubConnection connection3 = createRemoteConnection();
+        ConnectionInfo connectionInfo3 = createConnectionInfo();
+        SessionInfo sessionInfo3 = createSessionInfo(connectionInfo3);
+        ConsumerInfo consumerInfo3 = createConsumerInfo(sessionInfo3, 
destination);
+        connection3.send(connectionInfo3);
+        connection3.send(sessionInfo3);
+        connection3.request(consumerInfo3);
+
+        // ... and then grab the 2 that should be there.
+        assertNotNull(receiveMessage(connection3));
+        assertNotNull(receiveMessage(connection3));
+        assertNoMessagesLeft(connection3);
+    }
+
+    protected String getRemoteURI() {
+        return remoteURI;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DurablePersistentFalseRestartTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DurablePersistentFalseRestartTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DurablePersistentFalseRestartTest.java
new file mode 100644
index 0000000..b972498
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/DurablePersistentFalseRestartTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.transport.failover.FailoverTransport;
+
+public class DurablePersistentFalseRestartTest extends 
BrokerRestartTestSupport {
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        super.configureBroker(broker);
+        broker.setPersistent(false);
+        broker.setPersistenceAdapter(new KahaDBPersistenceAdapter());
+        broker.addConnector("tcp://0.0.0.0:0");
+    }
+
+    public void testValidateNoPersistenceForDurableAfterRestart() throws 
Exception {
+
+        ConnectionFactory connectionFactory =
+                new ActiveMQConnectionFactory("failover:(" + 
broker.getTransportConnectors().get(0).getPublishableConnectString() + ")");
+        ActiveMQConnection connection = (ActiveMQConnection) 
connectionFactory.createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        Topic destination = session.createTopic(queueName);
+        MessageConsumer consumer = 
session.createDurableSubscriber(destination, "subscriberName");
+
+        populateDestination(10, destination, connection);
+
+        restartBroker();
+
+        // make failover aware of the restarted auto assigned port
+        ((FailoverTransport) 
connection.getTransport().narrow(FailoverTransport.class)).add(true, 
broker.getTransportConnectors().get(0).getPublishableConnectString());
+
+        TextMessage msg = (TextMessage) consumer.receive(4000);
+        assertNull("did not get a message when persistent=false, message: " + 
msg, msg);
+
+        connection.close();
+    }
+
+    private void populateDestination(final int nbMessages,
+                                     final Destination destination, 
javax.jms.Connection connection)
+            throws JMSException {
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 1; i <= nbMessages; i++) {
+            producer.send(session.createTextMessage("<hello id='" + i + 
"'/>"));
+        }
+        producer.close();
+        session.close();
+    }
+
+
+    public static Test suite() {
+        return suite(DurablePersistentFalseRestartTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
new file mode 100644
index 0000000..5788dad
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import junit.framework.Test;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.apache.derby.jdbc.EmbeddedXADataSource;
+
+public class JdbcXARecoveryBrokerTest extends XARecoveryBrokerTest {
+
+    EmbeddedXADataSource dataSource;
+
+    @Override
+    protected void setUp() throws Exception {
+        dataSource = new EmbeddedXADataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+        super.setUp();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        stopDerby();
+    }
+
+    @Override
+    protected void configureBroker(BrokerService broker) throws Exception {
+        super.configureBroker(broker);
+
+        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
+        jdbc.setDataSource(dataSource);
+        broker.setPersistenceAdapter(jdbc);
+    }
+
+    @Override
+    protected void restartBroker() throws Exception {
+        broker.stop();
+        stopDerby();
+        dataSource = new EmbeddedXADataSource();
+        dataSource.setDatabaseName("derbyDb");
+        dataSource.setCreateDatabase("create");
+
+        broker = createRestartedBroker();
+        broker.start();
+    }
+
+    private void stopDerby() {
+        LOG.info("STOPPING DB!@!!!!");
+        final EmbeddedDataSource ds = dataSource;
+        try {
+            ds.setShutdownDatabase("shutdown");
+            ds.getConnection();
+        } catch (Exception ignored) {
+        }
+
+    }
+
+    public static Test suite() {
+        return suite(JdbcXARecoveryBrokerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+    @Override
+    protected ActiveMQDestination createDestination() {
+        return new ActiveMQQueue("test,special");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/Main.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/Main.java 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/Main.java
new file mode 100644
index 0000000..9e1fa5e
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/Main.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.jmx.ManagementContext;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.demo.DefaultQueueSender;
+
+/**
+ * A helper class which can be handy for running a broker in your IDE from the
+ * activemq-core module.
+ * 
+ * 
+ */
+public final class Main {
+    protected static boolean createConsumers;
+
+    private Main() {        
+    }
+    
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        try {
+            BrokerService broker = new BrokerService();
+            broker.setPersistent(false);
+
+            // String brokerDir = "xbean:...;
+            // System.setProperty("activemq.base", brokerDir);
+            // BrokerService broker = BrokerFactory.createBroker(new 
URI(brokerDir + "/activemq.xml"));
+
+            // for running on Java 5 without mx4j
+            ManagementContext managementContext = 
broker.getManagementContext();
+            managementContext.setFindTigerMbeanServer(true);
+            managementContext.setUseMBeanServer(true);
+            managementContext.setCreateConnector(false);
+
+            broker.setUseJmx(true);
+            // broker.setPlugins(new BrokerPlugin[] { new
+            // ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin() });
+            broker.addConnector("tcp://localhost:61616");
+            broker.addConnector("stomp://localhost:61613");
+            broker.start();
+
+            // lets publish some messages so that there is some stuff to browse
+            DefaultQueueSender.main(new String[] {"Prices.Equity.IBM"});
+            DefaultQueueSender.main(new String[] {"Prices.Equity.MSFT"});
+
+            // lets create a dummy couple of consumers
+            if (createConsumers) {
+                Connection connection = new 
ActiveMQConnectionFactory().createConnection();
+                connection.start();
+                Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                session.createConsumer(new ActiveMQQueue("Orders.IBM"));
+                session.createConsumer(new ActiveMQQueue("Orders.MSFT"), 
"price > 100");
+                Session session2 = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                session2.createConsumer(new ActiveMQQueue("Orders.MSFT"), 
"price > 200");
+            } else {
+                // Lets wait for the broker
+                broker.waitUntilStopped();
+            }
+        } catch (Exception e) {
+            System.out.println("Failed: " + e);
+            e.printStackTrace();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MarshallingBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MarshallingBrokerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MarshallingBrokerTest.java
new file mode 100644
index 0000000..3175156
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MarshallingBrokerTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.io.IOException;
+
+import junit.framework.Test;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * Runs against the broker but marshals all request and response commands.
+ * 
+ * 
+ */
+public class MarshallingBrokerTest extends BrokerTest {
+
+    public WireFormat wireFormat = new OpenWireFormat();
+
+    public void initCombos() {
+
+        OpenWireFormat wf1 = new OpenWireFormat();
+        wf1.setCacheEnabled(false);
+        OpenWireFormat wf2 = new OpenWireFormat();
+        wf2.setCacheEnabled(true);
+
+        addCombinationValues("wireFormat", new Object[] {wf1, wf2, });
+    }
+
+    protected StubConnection createConnection() throws Exception {
+        return new StubConnection(broker) {
+            public Response request(Command command) throws Exception {
+                Response r = 
super.request((Command)wireFormat.unmarshal(wireFormat.marshal(command)));
+                if (r != null) {
+                    r = (Response)wireFormat.unmarshal(wireFormat.marshal(r));
+                }
+                return r;
+            }
+
+            public void send(Command command) throws Exception {
+                
super.send((Command)wireFormat.unmarshal(wireFormat.marshal(command)));
+            }
+
+            protected void dispatch(Command command) throws 
InterruptedException, IOException {
+                
super.dispatch((Command)wireFormat.unmarshal(wireFormat.marshal(command)));
+            };
+        };
+    }
+
+    public static Test suite() {
+        return suite(MarshallingBrokerTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
new file mode 100644
index 0000000..5c7f29d
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.DeliveryMode;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import 
org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class MessageExpirationTest extends BrokerTestSupport {
+
+    public ActiveMQDestination destination;
+    public int deliveryMode = DeliveryMode.NON_PERSISTENT;
+    public int prefetch;
+    public byte destinationType = ActiveMQDestination.QUEUE_TYPE;
+    public boolean durableConsumer;
+
+    protected Message createMessage(ProducerInfo producerInfo, 
ActiveMQDestination destination, int deliveryMode, int timeToLive) {
+        Message message = createMessage(producerInfo, destination, 
deliveryMode);
+        long now = System.currentTimeMillis();
+        message.setTimestamp(now);
+        message.setExpiration(now + timeToLive);
+        return message;
+    }
+
+    public void initCombosForTestMessagesWaitingForUssageDecreaseExpire() {
+        addCombinationValues("deliveryMode", new Object[] 
{Integer.valueOf(DeliveryMode.NON_PERSISTENT), 
Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] 
{Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), 
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE),
+                                                              
Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), 
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        return broker;
+    }
+
+    protected PolicyEntry getDefaultPolicy() {
+        PolicyEntry policy = super.getDefaultPolicy();
+        // disable spooling
+        policy.setPendingSubscriberPolicy(new 
VMPendingSubscriberMessageStoragePolicy());
+        // have aggressive expiry period to ensure no deadlock or clash
+        policy.setExpireMessagesPeriod(100);
+        
+        return policy;
+    }
+
+    public void testMessagesWaitingForUsageDecreaseExpire() throws Exception {
+
+        // Start a producer
+        final StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        final ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        // Start a consumer..
+        final StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+
+        destination = createDestinationInfo(connection2, connectionInfo2, 
destinationType);
+        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, 
destination);
+        consumerInfo2.setPrefetchSize(1);
+        connection2.request(consumerInfo2);
+
+        // Reduce the limit so that only 1 message can flow through the broker
+        // at a time.
+        broker.getSystemUsage().getMemoryUsage().setLimit(1);
+
+        final Message m1 = createMessage(producerInfo, destination, 
deliveryMode);
+        final Message m2 = createMessage(producerInfo, destination, 
deliveryMode, 1000);
+        final Message m3 = createMessage(producerInfo, destination, 
deliveryMode);
+        final Message m4 = createMessage(producerInfo, destination, 
deliveryMode, 1000);
+
+        // Produce in an async thread since the producer will be getting 
blocked
+        // by the usage manager..
+        new Thread() {
+            public void run() {
+                // m1 and m3 should not expire.. but the others should.
+                try {
+                    connection.send(m1);
+                    connection.send(m2);
+                    connection.send(m3);
+                    connection.send(m4);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        }.start();
+
+        // Make sure only 1 message was delivered due to prefetch == 1
+        Message m = receiveMessage(connection2);
+        assertNotNull(m);
+        assertEquals(m1.getMessageId(), m.getMessageId());
+        assertNoMessagesLeft(connection);
+
+        // Sleep before we ack so that the messages expire on the usage manager
+        Thread.sleep(1500);
+        connection2.send(createAck(consumerInfo2, m, 1, 
MessageAck.STANDARD_ACK_TYPE));
+
+        // 2nd message received should be m3.. it should have expired 2nd
+        // message sent.
+        m = receiveMessage(connection2);
+        assertNotNull(m);
+        assertEquals(m3.getMessageId(), m.getMessageId());
+
+        // Sleep before we ack so that the messages expire on the usage manager
+        Thread.sleep(1500);
+        connection2.send(createAck(consumerInfo2, m, 1, 
MessageAck.STANDARD_ACK_TYPE));
+
+        // And there should be no messages left now..
+        assertNoMessagesLeft(connection2);
+
+        connection.send(closeConnectionInfo(connectionInfo));
+        connection.send(closeConnectionInfo(connectionInfo2));
+    }
+
+    public void initCombosForTestMessagesInLongTransactionExpire() {
+        addCombinationValues("deliveryMode", new Object[] 
{Integer.valueOf(DeliveryMode.PERSISTENT), 
Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] 
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), 
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), 
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testMessagesInLongTransactionExpire() throws Exception {
+
+        // Start a producer and consumer
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        destination = createDestinationInfo(connection, connectionInfo, 
destinationType);
+
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        consumerInfo.setPrefetchSize(1000);
+        connection.send(consumerInfo);
+
+        // Start the tx..
+        LocalTransactionId txid = createLocalTransaction(sessionInfo);
+        connection.send(createBeginTransaction(connectionInfo, txid));
+
+        // m1 and m3 should not expire.. but the others should.
+        Message m1 = createMessage(producerInfo, destination, deliveryMode);
+        m1.setTransactionId(txid);
+        connection.send(m1);
+        Message m = createMessage(producerInfo, destination, deliveryMode, 
1000);
+        m.setTransactionId(txid);
+        connection.send(m);
+        Message m3 = createMessage(producerInfo, destination, deliveryMode);
+        m3.setTransactionId(txid);
+        connection.send(m3);
+        m = createMessage(producerInfo, destination, deliveryMode, 1000);
+        m.setTransactionId(txid);
+        connection.send(m);
+
+        // Sleep before we commit so that the messages expire on the commit
+        // list..
+        Thread.sleep(1500);
+        connection.send(createCommitTransaction1Phase(connectionInfo, txid));
+
+        m = receiveMessage(connection);
+        assertNotNull(m);
+        assertEquals(m1.getMessageId(), m.getMessageId());
+        connection.send(createAck(consumerInfo, m, 1, 
MessageAck.STANDARD_ACK_TYPE));
+
+        // 2nd message received should be m3.. it should have expired 2nd
+        // message sent.
+        m = receiveMessage(connection);
+        assertNotNull(m);
+        assertEquals(m3.getMessageId(), m.getMessageId());
+        connection.send(createAck(consumerInfo, m, 1, 
MessageAck.STANDARD_ACK_TYPE));
+
+        // And there should be no messages left now..
+        assertNoMessagesLeft(connection);
+
+        connection.send(closeConnectionInfo(connectionInfo));
+    }
+
+    public void initCombosForTestMessagesInSubscriptionPendingListExpire() {
+        addCombinationValues("deliveryMode", new Object[] 
{Integer.valueOf(DeliveryMode.NON_PERSISTENT), 
Integer.valueOf(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] 
{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), 
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
+                                                              
Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), 
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
+    }
+
+    public void testMessagesInSubscriptionPendingListExpire() throws Exception 
{
+
+        // Start a producer and consumer
+        StubConnection connection = createConnection();
+        ConnectionInfo connectionInfo = createConnectionInfo();
+        SessionInfo sessionInfo = createSessionInfo(connectionInfo);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo);
+        connection.send(connectionInfo);
+        connection.send(sessionInfo);
+        connection.send(producerInfo);
+
+        destination = createDestinationInfo(connection, connectionInfo, 
destinationType);
+
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, 
destination);
+        consumerInfo.setPrefetchSize(1);
+        connection.send(consumerInfo);
+
+        // m1 and m3 should not expire.. but the others should.
+        Message m1 = createMessage(producerInfo, destination, deliveryMode);
+        connection.send(m1);
+        connection.send(createMessage(producerInfo, destination, deliveryMode, 
1000));
+        Message m3 = createMessage(producerInfo, destination, deliveryMode);
+        connection.send(m3);
+        connection.send(createMessage(producerInfo, destination, deliveryMode, 
1000));
+
+        // Make sure only 1 message was delivered due to prefetch == 1
+        Message m = receiveMessage(connection);
+        assertNotNull(m);
+        assertEquals(m1.getMessageId(), m.getMessageId());
+        assertNoMessagesLeft(connection);
+
+        // Sleep before we ack so that the messages expire on the pending 
list..
+        Thread.sleep(1500);
+        connection.send(createAck(consumerInfo, m, 1, 
MessageAck.STANDARD_ACK_TYPE));
+
+        // 2nd message received should be m3.. it should have expired 2nd
+        // message sent.
+        m = receiveMessage(connection);
+        assertNotNull(m);
+        assertEquals(m3.getMessageId(), m.getMessageId());
+        connection.send(createAck(consumerInfo, m, 1, 
MessageAck.STANDARD_ACK_TYPE));
+
+        // And there should be no messages left now..
+        assertNoMessagesLeft(connection);
+
+        connection.send(closeConnectionInfo(connectionInfo));
+    }
+
+    public static Test suite() {
+        return suite(MessageExpirationTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
new file mode 100644
index 0000000..898256c
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/NioQueueSubscriptionTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+@RunWith(BlockJUnit4ClassRunner.class)
+public class NioQueueSubscriptionTest extends QueueSubscriptionTest {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(NioQueueSubscriptionTest.class);
+
+    private final Map<Thread, Throwable> exceptions = 
Collections.synchronizedMap(new HashMap<Thread, Throwable>());
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new 
ActiveMQConnectionFactory("tcp://localhost:62621?trace=false");
+    }
+
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = BrokerFactory.createBroker(new URI(
+            
"broker://nio://localhost:62621?useQueueForAccept=false&persistent=false&wiewformat.maxInactivityDuration=0"));
+        answer.getManagementContext().setCreateConnector(false);
+        answer.setUseJmx(false);
+        answer.setDeleteAllMessagesOnStartup(true);
+        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
+        final PolicyEntry entry = new PolicyEntry();
+        entry.setQueue(">");
+        entry.setOptimizedDispatch(true);
+        policyEntries.add(entry);
+
+        final PolicyMap policyMap = new PolicyMap();
+        policyMap.setPolicyEntries(policyEntries);
+        answer.setDestinationPolicy(policyMap);
+        return answer;
+    }
+
+
+    @Ignore("See AMQ-4286")
+    @Test(timeout = 60 * 1000)
+    public void testLotsOfConcurrentConnections() throws Exception {
+        ExecutorService executor = Executors.newCachedThreadPool();
+        final ConnectionFactory factory = createConnectionFactory();
+        int connectionCount = 400;
+        final AtomicInteger threadId = new AtomicInteger(0);
+        for (int i = 0; i < connectionCount; i++) {
+            executor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    final int innerId = threadId.incrementAndGet();
+                    try {
+                        ExceptionListener listener = new 
NioQueueSubscriptionTestListener(innerId, exceptions, LOG);
+                        ActiveMQConnection connection = (ActiveMQConnection) 
factory.createConnection();
+                        connection.setExceptionListener(listener);
+                        connection.start();
+                        assertNotNull(connection.getBrokerName());
+                        connections.add(connection);
+                    } catch (Exception e) {
+                        LOG.error(">>>> Exception in run() on thread " + 
innerId, e);
+                        exceptions.put(Thread.currentThread(), e);
+                    }
+                }
+            });
+        }
+
+        executor.shutdown();
+        executor.awaitTermination(30, TimeUnit.SECONDS);
+
+        if (!exceptions.isEmpty()) {
+            LOG.error(">>>> " + exceptions.size() + " exceptions like", 
exceptions.values().iterator().next());
+            fail("unexpected exceptions in worker threads: " + 
exceptions.values().iterator().next());
+        }
+        LOG.info("created " + connectionCount + " connections");
+    }
+}
+
+class NioQueueSubscriptionTestListener implements ExceptionListener {
+    private int id = 0;
+    protected Logger LOG;
+    private final Map<Thread, Throwable> exceptions;
+
+    public NioQueueSubscriptionTestListener(int id, Map<Thread, Throwable> 
exceptions, Logger log) {
+        this.id = id;
+        this.exceptions = exceptions;
+        this.LOG = log;
+    }
+
+    @Override
+    public void onException(JMSException exception) {
+        LOG.error(">>>> Exception in onException() on thread " + id, 
exception);
+        exceptions.put(Thread.currentThread(), exception);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/OutOfOrderXMLTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/OutOfOrderXMLTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/OutOfOrderXMLTest.java
new file mode 100644
index 0000000..11fbb56
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/OutOfOrderXMLTest.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.net.URI;
+import org.apache.activemq.xbean.XBeanBrokerFactory;
+import org.junit.Test;
+
+// https://issues.apache.org/activemq/browse/AMQ-2939
+public class OutOfOrderXMLTest {
+
+    @Test
+    public void verifyBrokerCreationWhenXmlOutOfOrderValidationFalse() throws 
Exception {
+        BrokerService answer =
+                BrokerFactory.createBroker(new 
URI("xbean:org/apache/activemq/broker/out-of-order-broker-elements.xml?validate=false"));
+        answer.stop();
+
+    }    
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ProgressPrinter.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ProgressPrinter.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ProgressPrinter.java
new file mode 100644
index 0000000..dcf4b6e
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ProgressPrinter.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker;
+
+public class ProgressPrinter {
+
+    private final long total;
+    private final long interval;
+    private long percentDone;
+    private long counter;
+
+    public ProgressPrinter(long total, long interval) {
+        this.total = total;
+        this.interval = interval;
+    }
+
+    public synchronized void increment() {
+        update(++counter);
+    }
+
+    public synchronized void update(long current) {
+        long at = 100 * current / total;
+        if ((percentDone / interval) != (at / interval)) {
+            percentDone = at;
+            System.out.println("Completed: " + percentDone + "%");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueMbeanRestartTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueMbeanRestartTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueMbeanRestartTest.java
new file mode 100644
index 0000000..c004fef
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueMbeanRestartTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.JMXSupport;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class QueueMbeanRestartTest extends TestSupport {
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(QueueMbeanRestartTest.class);
+
+    BrokerService broker;
+
+    private final TestSupport.PersistenceAdapterChoice 
persistenceAdapterChoice;
+
+    @Parameterized.Parameters
+    public static Collection<TestSupport.PersistenceAdapterChoice[]> 
getTestParameters() {
+        TestSupport.PersistenceAdapterChoice[] kahaDb = 
{TestSupport.PersistenceAdapterChoice.KahaDB};
+        TestSupport.PersistenceAdapterChoice[] levelDb = 
{TestSupport.PersistenceAdapterChoice.LevelDB};
+        TestSupport.PersistenceAdapterChoice[] jdbc = 
{TestSupport.PersistenceAdapterChoice.JDBC};
+        List<TestSupport.PersistenceAdapterChoice[]> choices = new 
ArrayList<TestSupport.PersistenceAdapterChoice[]>();
+        choices.add(kahaDb);
+        choices.add(levelDb);
+        choices.add(jdbc);
+
+        return choices;
+    }
+
+    public QueueMbeanRestartTest(TestSupport.PersistenceAdapterChoice choice) {
+        this.persistenceAdapterChoice = choice;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        topic = false;
+        super.setUp();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        broker.stop();
+    }
+
+    @Test(timeout = 60000)
+    public void testMBeanPresenceOnRestart() throws Exception {
+        createBroker(true);
+
+        sendMessages();
+        verifyPresenceOfQueueMbean();
+        LOG.info("restart....");
+
+        restartBroker();
+        verifyPresenceOfQueueMbean();
+    }
+
+    private void restartBroker() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        Thread.sleep(5 * 1000);
+        createBroker(false);
+        broker.waitUntilStarted();
+    }
+
+    private void verifyPresenceOfQueueMbean() throws Exception {
+        for (ObjectName name : broker.getManagementContext().queryNames(null, 
null)) {
+            LOG.info("candidate :" + name);
+            String type = name.getKeyProperty("destinationType");
+            if (type != null && type.equals("Queue")) {
+                assertEquals(
+                        JMXSupport.encodeObjectNamePart(((ActiveMQQueue) 
createDestination()).getPhysicalName()),
+                        name.getKeyProperty("destinationName"));
+                LOG.info("found mbbean " + name);
+                return;
+            }
+        }
+        fail("expected to find matching queue mbean for: " + 
createDestination());
+    }
+
+    private void sendMessages() throws Exception {
+        Session session = createConnection().createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(createDestination());
+        producer.send(session.createTextMessage());
+    }
+
+    private void createBroker(boolean deleteAll) throws Exception {
+        broker = new BrokerService();
+        setPersistenceAdapter(broker, persistenceAdapterChoice);
+
+        broker.setDeleteAllMessagesOnStartup(deleteAll);
+        broker.start();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
new file mode 100644
index 0000000..6c3dc15
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/QueueSubscriptionTest.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+
+@RunWith(BlockJUnit4ClassRunner.class)
+public class QueueSubscriptionTest extends JmsMultipleClientsTestSupport {
+    protected int messageCount = 1000; // 1000 Messages per producer
+    protected int prefetchCount = 10;
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        durable = false;
+        topic = false;
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+
+    @Test(timeout = 60 * 1000)
+    public void testManyProducersOneConsumer() throws Exception {
+        consumerCount = 1;
+        producerCount = 10;
+        messageCount = 100;
+        messageSize = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws 
Exception {
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount = 1000;
+        messageSize = 1024; // 1 Kb
+        configurePrefetchOfOne();
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws 
Exception {
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount = 1000;
+        prefetchCount = messageCount * 2;
+        messageSize = 1024; // 1 Kb
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    @Test(timeout = 2 * 60 * 1000)
+    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws 
Exception {
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount = 10;
+        messageSize = 1024 * 1024 * 1; // 2 MB
+        configurePrefetchOfOne();
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws 
Exception {
+        consumerCount = 2;
+        producerCount = 1;
+        messageCount = 10;
+        prefetchCount = messageCount * 2;
+        messageSize = 1024 * 1024 * 1; // 2 MB
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerManyConsumersFewMessages() throws Exception {
+        consumerCount = 50;
+        producerCount = 1;
+        messageCount = 10;
+        messageSize = 1; // 1 byte
+        prefetchCount = 10;
+
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerManyConsumersManyMessages() throws Exception {
+        consumerCount = 50;
+        producerCount = 1;
+        messageCount = 1000;
+        messageSize = 1; // 1 byte
+        prefetchCount = messageCount / consumerCount;
+        
allMessagesList.setMaximumDuration(allMessagesList.getMaximumDuration() * 20);
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    @Test(timeout = 2 * 60 * 1000)
+    public void testManyProducersManyConsumers() throws Exception {
+        consumerCount = 200;
+        producerCount = 50;
+        messageCount = 100;
+        messageSize = 1; // 1 byte
+        prefetchCount = 100;
+        
allMessagesList.setMaximumDuration(allMessagesList.getMaximumDuration() * 20);
+        doMultipleClientsTest();
+
+        assertTotalMessagesReceived(messageCount * producerCount);
+        assertDestinationMemoryUsageGoesToZero();
+    }
+
+    protected void configurePrefetchOfOne() {
+        prefetchCount = 1;
+
+        // this is gonna be a bit slow what with the low prefetch so bump up 
the
+        // wait time
+        
allMessagesList.setMaximumDuration(allMessagesList.getMaximumDuration() * 20);
+    }
+
+    public void doMultipleClientsTest() throws Exception {
+        // Create destination
+        final ActiveMQDestination dest = createDestination();
+
+        // Create consumers
+        ActiveMQConnectionFactory consumerFactory = 
(ActiveMQConnectionFactory)createConnectionFactory();
+        consumerFactory.getPrefetchPolicy().setAll(prefetchCount);
+
+        startConsumers(consumerFactory, dest);
+
+        startProducers(dest, messageCount);
+
+        // Wait for messages to be received. Make it proportional to the
+        // messages delivered.
+        int totalMessageCount = messageCount * producerCount;
+        if (dest.isTopic()) {
+            totalMessageCount *= consumerCount;
+        }
+        waitForAllMessagesToBeReceived(totalMessageCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java
new file mode 100644
index 0000000..181a907
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/ReconnectWithJMXEnabledTest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+
+/**
+ *
+ *
+ */
+public class ReconnectWithJMXEnabledTest extends EmbeddedBrokerTestSupport {
+
+    protected Connection connection;
+    protected boolean transacted;
+    protected int authMode = Session.AUTO_ACKNOWLEDGE;
+
+    public void testTestUseConnectionCloseBrokerThenRestartInSameJVM() throws 
Exception {
+        connection = connectionFactory.createConnection();
+        useConnection(connection);
+        connection.close();
+
+        broker.stop();
+        broker = createBroker();
+        startBroker();
+
+        connectionFactory = createConnectionFactory();
+        connection = connectionFactory.createConnection();
+        useConnection(connection);
+    }
+
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:0";
+        super.setUp();
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new 
ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
+    }
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setUseJmx(true);
+        answer.setPersistent(isPersistent());
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+
+    protected void useConnection(Connection connection) throws Exception {
+        connection.setClientID("foo");
+        connection.start();
+        Session session = connection.createSession(transacted, authMode);
+        Destination destination = createDestination();
+        MessageConsumer consumer = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+        Message message = session.createTextMessage("Hello World");
+        producer.send(message);
+        Thread.sleep(1000);
+        consumer.close();
+    }
+}

Reply via email to