http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageGroupConfigTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageGroupConfigTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageGroupConfigTest.java
new file mode 100644
index 0000000..112cc46
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageGroupConfigTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.broker.region.group.CachedMessageGroupMap;
+import org.apache.activemq.broker.region.group.MessageGroupHashBucket;
+import org.apache.activemq.broker.region.group.MessageGroupMap;
+import org.apache.activemq.broker.region.group.SimpleMessageGroupMap;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * 
+ */
+public class MessageGroupConfigTest extends TestSupport {
+    protected BrokerService broker;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();  
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker.stop();
+        super.tearDown();    
+    }
+
+
+
+    public void testCachedGroupConfiguration() throws Exception {
+        doTestGroupConfiguration("cached",CachedMessageGroupMap.class);
+    }
+
+    public void testCachedGroupConfigurationWithCacheSize() throws Exception {
+        CachedMessageGroupMap result = (CachedMessageGroupMap) 
doTestGroupConfiguration("cached?cacheSize=10",CachedMessageGroupMap.class);
+        assertEquals(10,result.getMaximumCacheSize());
+
+    }
+
+    public void testSimpleGroupConfiguration() throws Exception {
+        doTestGroupConfiguration("simple", SimpleMessageGroupMap.class);
+    }
+
+    public void testBucketGroupConfiguration() throws Exception {
+        doTestGroupConfiguration("bucket", MessageGroupHashBucket.class);
+    }
+
+    public void testBucketGroupConfigurationWithBucketCount() throws Exception 
{
+        MessageGroupHashBucket result = (MessageGroupHashBucket) 
doTestGroupConfiguration("bucket?bucketCount=2", MessageGroupHashBucket.class);
+        assertEquals(2,result.getBucketCount());
+    }
+
+    public MessageGroupMap doTestGroupConfiguration(String type, Class 
classType) throws Exception {
+        broker = new BrokerService();
+
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setMessageGroupMapFactoryType(type);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+        broker.start();
+        super.topic = false;
+        ActiveMQDestination destination = (ActiveMQDestination) 
createDestination("org.apache.foo");
+        Queue brokerDestination = (Queue) broker.getDestination(destination);
+
+        assertNotNull(brokerDestination);
+        MessageGroupMap messageGroupMap = 
brokerDestination.getMessageGroupOwners();
+        assertNotNull(messageGroupMap);
+        assertTrue(messageGroupMap.getClass().isAssignableFrom(classType));
+        return messageGroupMap;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java
new file mode 100644
index 0000000..b91f5a8
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/MessageListenerDeadLetterTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageListenerDeadLetterTest extends DeadLetterTestSupport {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(MessageListenerDeadLetterTest.class);
+
+    private int rollbackCount;
+
+    private Session dlqSession;
+
+    private final Error[] error = new Error[1];
+
+    protected void doTest() throws Exception {
+        messageCount = 200;
+        connection.start();
+
+        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
+        rollbackCount = 
amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
+        LOG.info("Will redeliver messages: " + rollbackCount + " times");
+
+        makeConsumer();
+        makeDlqConsumer();
+
+        sendMessages();
+
+        // now lets receive and rollback N times
+        int maxRollbacks = messageCount * rollbackCount;
+        consumer.setMessageListener(new RollbackMessageListener(maxRollbacks, 
rollbackCount));
+
+        for (int i = 0; i < messageCount; i++) {
+            Message msg = dlqConsumer.receive(4000);
+            if (error[0] != null) {
+                // error from message listener
+                throw error[0];
+            }
+            assertMessage(msg, i);
+            assertNotNull("Should be a DLQ message for loop: " + i, msg);
+        }
+        if (error[0] != null) {
+            throw error[0];
+        }
+    }
+
+    protected void makeDlqConsumer() throws JMSException {
+        dlqDestination = createDlqDestination();
+
+        LOG.info("Consuming from dead letter on: " + dlqDestination);
+        dlqConsumer = dlqSession.createConsumer(dlqDestination);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        transactedMode = true;
+        super.setUp();
+        dlqSession = connection.createSession(transactedMode, acknowledgeMode);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        dlqConsumer.close();
+        dlqSession.close();
+        session.close();
+        super.tearDown();
+    };
+
+    protected ActiveMQConnectionFactory createConnectionFactory()
+            throws Exception {
+        ActiveMQConnectionFactory answer = super.createConnectionFactory();
+        RedeliveryPolicy policy = new RedeliveryPolicy();
+        policy.setMaximumRedeliveries(3);
+        policy.setBackOffMultiplier((short) 1);
+        policy.setRedeliveryDelay(0);
+        policy.setInitialRedeliveryDelay(0);
+        policy.setUseExponentialBackOff(false);
+        answer.setRedeliveryPolicy(policy);
+        return answer;
+    }
+
+    protected Destination createDlqDestination() {
+        return new ActiveMQQueue("ActiveMQ.DLQ");
+    }
+
+    class RollbackMessageListener implements MessageListener {
+
+        final int maxRollbacks;
+
+        final int deliveryCount;
+
+        AtomicInteger rollbacks = new AtomicInteger();
+
+        RollbackMessageListener(int c, int delvery) {
+            maxRollbacks = c;
+            deliveryCount = delvery;
+        }
+
+        public void onMessage(Message message) {
+            try {
+                int expectedMessageId = rollbacks.get() / deliveryCount;
+                LOG.info("expecting messageId: " + expectedMessageId);
+                assertMessage(message, expectedMessageId);
+                if (rollbacks.incrementAndGet() > maxRollbacks) {
+                    fail("received too many messages, already done too many 
rollbacks: "
+                            + rollbacks);
+                }
+                session.rollback();
+
+            } catch (Throwable e) {
+                LOG.error("unexpected exception:" + e, e);
+                // propagating assertError to execution task will cause a hang
+                // at shutdown
+                if (e instanceof Error) {
+                    error[0] = (Error) e;
+                } else {
+                    fail("unexpected exception: " + e);
+                }
+
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
new file mode 100644
index 0000000..7c53730
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoConsumerDeadLetterTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+
+/**
+ * 
+ */
+public class NoConsumerDeadLetterTest extends DeadLetterTestSupport {
+
+    // lets disable the inapplicable tests
+    public void testDurableQueueMessage() throws Exception {
+    }
+
+    public void testDurableTopicMessage() throws Exception {
+    }
+
+    protected void doTest() throws Exception {
+        makeDlqConsumer();
+        sendMessages();
+
+        for (int i = 0; i < messageCount; i++) {
+            Message msg = dlqConsumer.receive(1000);
+            assertNotNull("Should be a message for loop: " + i, msg);
+        }
+    }
+    
+    public void testConsumerReceivesMessages() throws Exception {
+       this.topic = false;
+        ActiveMQConnectionFactory factory = 
(ActiveMQConnectionFactory)createConnectionFactory();
+        connection = (ActiveMQConnection)factory.createConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(getDestination());
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+   
+        Topic advisoryTopic = 
AdvisorySupport.getNoQueueConsumersAdvisoryTopic(getDestination());
+        MessageConsumer advisoryConsumer = 
session.createConsumer(advisoryTopic);
+        
+        TextMessage msg = session.createTextMessage("Message: x");
+        producer.send(msg);
+        
+        Message advisoryMessage = advisoryConsumer.receive(1000);
+        assertNotNull("Advisory message not received", advisoryMessage);
+        
+        Thread.sleep(1000);
+        
+        factory = (ActiveMQConnectionFactory)createConnectionFactory();
+        connection = (ActiveMQConnection)factory.createConnection();
+        connection.start();
+        
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
+        MessageConsumer consumer = session.createConsumer(getDestination());
+        Message received = consumer.receive(1000);
+        assertNotNull("Message not received", received);
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setSendAdvisoryIfNoConsumers(true);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    protected Destination createDlqDestination() {
+       if (this.topic) {
+               return 
AdvisorySupport.getNoTopicConsumersAdvisoryTopic((ActiveMQDestination)getDestination());
+       } else {
+               return 
AdvisorySupport.getNoQueueConsumersAdvisoryTopic((ActiveMQDestination)getDestination());
+       }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java
new file mode 100644
index 0000000..a14df74
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RedeliveryPolicy;
+
+public class NoRetryDeadLetterTest extends DeadLetterTest {
+
+    @Override
+    protected ActiveMQConnectionFactory createConnectionFactory() throws 
Exception {
+        ActiveMQConnectionFactory connectionFactory = 
super.createConnectionFactory();
+        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+        redeliveryPolicy.setMaximumRedeliveries(0);
+        connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
+        return connectionFactory;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java
new file mode 100644
index 0000000..05be639
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PerDurableConsumerDeadLetterTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import javax.jms.Destination;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+
+/**
+ * for durable subs, allow a dlq per subscriber such that poison messages are 
not duplicates
+ * on the dlq and such that rejecting consumers can be identified
+ * https://issues.apache.org/jira/browse/AMQ-3003
+ */
+public class PerDurableConsumerDeadLetterTest extends DeadLetterTest {
+
+    private static final String CLIENT_ID = "george";
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+
+        PolicyEntry policy = new PolicyEntry();
+        IndividualDeadLetterStrategy strategy = new 
IndividualDeadLetterStrategy();
+        strategy.setProcessNonPersistent(true);
+        strategy.setDestinationPerDurableSubscriber(true);
+        policy.setDeadLetterStrategy(strategy);
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    @Override
+    protected String createClientId() {
+        return CLIENT_ID;
+    }
+
+    @Override
+    protected Destination createDlqDestination() {
+        String prefix = topic ? "ActiveMQ.DLQ.Topic." : "ActiveMQ.DLQ.Queue.";
+        String destinationName = prefix + getClass().getName() + "." + 
getName();
+        if (durableSubscriber) {
+            String subName = // connectionId:SubName
+                CLIENT_ID + ":" + getDestination().toString();
+            destinationName += "." + subName ;
+        }
+        return new ActiveMQQueue(destinationName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
new file mode 100644
index 0000000..448f47c
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/PriorityNetworkDispatchPolicyTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.usage.SystemUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PriorityNetworkDispatchPolicyTest {
+
+    PriorityNetworkDispatchPolicy underTest = new 
PriorityNetworkDispatchPolicy();
+    SystemUsage usageManager = new SystemUsage();
+    ConsumerInfo info = new ConsumerInfo();
+    ActiveMQMessage node = new ActiveMQMessage();
+    ConsumerId id = new ConsumerId();
+    ConnectionContext context = new ConnectionContext();
+    BrokerService brokerService = new BrokerService();
+
+    @Before
+    public void init() throws Exception {
+        info.setDestination(ActiveMQDestination.createDestination("test", 
ActiveMQDestination.TOPIC_TYPE));
+        info.setConsumerId(id);
+        info.setNetworkSubscription(true);
+        info.setNetworkConsumerPath(new ConsumerId[]{id});
+        node.setMessageId(new MessageId("test:1:1:1:1"));
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test
+    public void testRemoveLowerPriorityDup() throws Exception {
+
+        List<Subscription> consumers = new ArrayList<Subscription>();
+
+        for (int i=0; i<3; i++) {
+            ConsumerInfo instance = info.copy();
+            instance.setPriority((byte)i);
+            consumers.add(new TopicSubscription(brokerService.getBroker(), 
context, instance, usageManager));
+        }
+        underTest.dispatch(node, null, consumers);
+
+        long count = 0;
+        for (Subscription consumer : consumers) {
+            count += consumer.getEnqueueCounter();
+        }
+        assertEquals("only one sub got message", 1, count);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
new file mode 100644
index 0000000..221f603
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/RoundRobinDispatchPolicyTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.QueueSubscriptionTest;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+@RunWith(BlockJUnit4ClassRunner.class)
+public class RoundRobinDispatchPolicyTest extends QueueSubscriptionTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setDispatchPolicy(new RoundRobinDispatchPolicy());
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws 
Exception {
+        super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
+
+        // Ensure that each consumer should have received at least one message
+        // We cannot guarantee that messages will be equally divided, since
+        // prefetch is one
+        assertEachConsumerReceivedAtLeastXMessages(1);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws 
Exception {
+        super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+        assertMessagesDividedAmongConsumers();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws 
Exception {
+        super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
+
+        // Ensure that each consumer should have received at least one message
+        // We cannot guarantee that messages will be equally divided, since
+        // prefetch is one
+        assertEachConsumerReceivedAtLeastXMessages(1);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws 
Exception {
+        super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
+        assertMessagesDividedAmongConsumers();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerManyConsumersFewMessages() throws Exception {
+        super.testOneProducerManyConsumersFewMessages();
+
+        // Since there are more consumers, each consumer should have received 
at
+        // most one message only
+        assertMessagesDividedAmongConsumers();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerManyConsumersManyMessages() throws Exception {
+        super.testOneProducerManyConsumersManyMessages();
+        assertMessagesDividedAmongConsumers();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testManyProducersManyConsumers() throws Exception {
+        super.testManyProducersManyConsumers();
+        assertMessagesDividedAmongConsumers();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoMatchingConsumersOneNotMatchingConsumer() 
throws Exception {
+        // Create consumer that won't consume any message
+        createMessageConsumer(createConnectionFactory().createConnection(), 
createDestination(), "JMSPriority<1");
+        super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+        assertMessagesDividedAmongConsumers();
+    }
+
+    protected MessageConsumer createMessageConsumer(Connection conn, 
Destination dest, String selector) throws Exception {
+        connections.add(conn);
+
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final MessageConsumer consumer = sess.createConsumer(dest, selector);
+        conn.start();
+
+        return consumer;
+    }
+
+    public void assertMessagesDividedAmongConsumers() {
+        assertEachConsumerReceivedAtLeastXMessages((messageCount * 
producerCount) / consumerCount);
+        assertEachConsumerReceivedAtMostXMessages(((messageCount * 
producerCount) / consumerCount) + 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java
new file mode 100644
index 0000000..354031e
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SecureDLQTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.filter.DestinationMap;
+import org.apache.activemq.security.*;
+
+import javax.jms.*;
+
+import static org.apache.activemq.security.SimpleSecurityBrokerSystemTest.*;
+
+public class SecureDLQTest extends DeadLetterTestSupport {
+
+    Connection dlqConnection;
+    Session dlqSession;
+
+    public static AuthorizationMap createAuthorizationMap() {
+        DestinationMap readAccess = new DefaultAuthorizationMap();
+        readAccess.put(new ActiveMQQueue("TEST"), ADMINS);
+        readAccess.put(new ActiveMQQueue("TEST"), USERS);
+        readAccess.put(new ActiveMQQueue("ActiveMQ.DLQ"), ADMINS);
+
+        DestinationMap writeAccess = new DefaultAuthorizationMap();
+        writeAccess.put(new ActiveMQQueue("TEST"), ADMINS);
+        writeAccess.put(new ActiveMQQueue("TEST"), USERS);
+        writeAccess.put(new ActiveMQQueue("ActiveMQ.DLQ"), ADMINS);
+
+        readAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
+        writeAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
+
+        DestinationMap adminAccess = new DefaultAuthorizationMap();
+        adminAccess.put(new ActiveMQQueue("TEST"), ADMINS);
+        adminAccess.put(new ActiveMQQueue("TEST"), USERS);
+        adminAccess.put(new ActiveMQQueue("ActiveMQ.DLQ"), ADMINS);
+        adminAccess.put(new ActiveMQTopic("ActiveMQ.Advisory.>"), WILDCARD);
+
+        return new SimpleAuthorizationMap(writeAccess, readAccess, 
adminAccess);
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        AuthorizationPlugin authorizationPlugin = new 
AuthorizationPlugin(createAuthorizationMap());
+
+        broker.setPlugins(new BrokerPlugin[] {authorizationPlugin, new 
SimpleSecurityBrokerSystemTest.SimpleAuthenticationFactory()});
+        return broker;
+    }
+
+    // lets disable the inapplicable tests
+    public void testTransientTopicMessage() throws Exception {
+    }
+
+    public void testDurableTopicMessage() throws Exception {
+    }
+
+    @Override
+    protected void doTest() throws Exception {
+        timeToLive = 1000;
+        acknowledgeMode = Session.CLIENT_ACKNOWLEDGE;
+        makeConsumer();
+        sendMessages();
+        Thread.sleep(1000);
+        consumer.close();
+
+        Thread.sleep(1000);
+        // this should try to send expired messages to dlq
+        makeConsumer();
+
+        makeDlqConsumer();
+        for (int i = 0; i < messageCount; i++) {
+            Message msg = dlqConsumer.receive(1000);
+            assertMessage(msg, i);
+            assertNotNull("Should be a DLQ message for loop: " + i, msg);
+        }
+
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (dlqConnection != null) {
+            dlqConnection.close();
+        }
+        super.tearDown();
+    }
+
+    @Override
+    protected Connection createConnection() throws Exception {
+        return getConnectionFactory().createConnection("user", "password");
+    }
+
+    @Override
+    protected void makeDlqConsumer() throws Exception {
+        dlqDestination = createDlqDestination();
+        dlqConnection = getConnectionFactory().createConnection("system", 
"manager");
+        dlqConnection.start();
+        dlqSession = dlqConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        dlqConsumer = dlqSession.createConsumer(dlqDestination);
+    }
+
+    @Override
+    protected Destination createDlqDestination() {
+        return new ActiveMQQueue("ActiveMQ.DLQ");
+    }
+
+    @Override
+    protected String getDestinationString() {
+        return "TEST";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
new file mode 100644
index 0000000..d6d6b08
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/SimpleDispatchPolicyTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.QueueSubscriptionTest;
+import 
org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
+import org.apache.activemq.util.MessageIdList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+
+import javax.jms.MessageConsumer;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(BlockJUnit4ClassRunner.class)
+public class SimpleDispatchPolicyTest extends QueueSubscriptionTest {
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setDispatchPolicy(new SimpleDispatchPolicy());
+        policy.setSubscriptionRecoveryPolicy(new 
FixedCountSubscriptionRecoveryPolicy());
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    @Override
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws 
Exception {
+        super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+
+        // One consumer should have received all messages, and the rest none
+        // assertOneConsumerReceivedAllMessages(messageCount);
+    }
+
+    @Override
+    @Test(timeout = 60 * 1000)
+    public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws 
Exception {
+        super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
+
+        // One consumer should have received all messages, and the rest none
+        // assertOneConsumerReceivedAllMessages(messageCount);
+    }
+
+    public void assertOneConsumerReceivedAllMessages(int messageCount) throws 
Exception {
+        boolean found = false;
+        for (Iterator<MessageConsumer> i = consumers.keySet().iterator(); 
i.hasNext();) {
+            MessageIdList messageIdList = consumers.get(i.next());
+            int count = messageIdList.getMessageCount();
+            if (count > 0) {
+                if (found) {
+                    fail("No other consumers should have received any 
messages");
+                } else {
+                    assertEquals("Consumer should have received all 
messages.", messageCount, count);
+                    found = true;
+                }
+            }
+        }
+
+        if (!found) {
+            fail("At least one consumer should have received all messages");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
new file mode 100644
index 0000000..dbeabfe
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.policy;
+
+import java.util.Iterator;
+
+import javax.jms.MessageConsumer;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TopicSubscriptionTest;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
+import org.apache.activemq.util.MessageIdList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.BlockJUnit4ClassRunner;
+
+import static org.junit.Assert.*;
+
+@RunWith(BlockJUnit4ClassRunner.class)
+public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setDispatchPolicy(new StrictOrderDispatchPolicy());
+
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+
+        broker.setDestinationPolicy(pMap);
+
+        return broker;
+    }
+
+    @Test
+    @Override
+    public void testOneProducerTwoConsumersLargeMessagesOnePrefetch() throws 
Exception {
+        super.testOneProducerTwoConsumersLargeMessagesOnePrefetch();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    @Test
+    @Override
+    public void testOneProducerTwoConsumersSmallMessagesOnePrefetch() throws 
Exception {
+        super.testOneProducerTwoConsumersSmallMessagesOnePrefetch();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    @Test
+    @Override
+    public void testOneProducerTwoConsumersSmallMessagesLargePrefetch() throws 
Exception {
+        super.testOneProducerTwoConsumersSmallMessagesLargePrefetch();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    @Test
+    @Override
+    public void testOneProducerTwoConsumersLargeMessagesLargePrefetch() throws 
Exception {
+        super.testOneProducerTwoConsumersLargeMessagesLargePrefetch();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    @Test
+    @Override
+    public void testOneProducerManyConsumersFewMessages() throws Exception {
+        super.testOneProducerManyConsumersFewMessages();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+
+    @Test
+    @Override
+    public void testOneProducerManyConsumersManyMessages() throws Exception {
+        super.testOneProducerManyConsumersManyMessages();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    @Test
+    @Override
+    public void testManyProducersOneConsumer() throws Exception {
+        super.testManyProducersOneConsumer();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    @Test
+    @Override
+    public void testManyProducersManyConsumers() throws Exception {
+        super.testManyProducersManyConsumers();
+
+        assertReceivedMessagesAreOrdered();
+    }
+
+    public void assertReceivedMessagesAreOrdered() throws Exception {
+        // If there is only one consumer, messages is definitely ordered
+        if (consumers.size() <= 1) {
+            return;
+        }
+
+        // Get basis of order
+        Iterator<MessageConsumer> i = consumers.keySet().iterator();
+        MessageIdList messageOrder = (MessageIdList)consumers.get(i.next());
+
+        for (; i.hasNext();) {
+            MessageIdList messageIdList = 
(MessageIdList)consumers.get(i.next());
+            assertTrue("Messages are not ordered.", 
messageOrder.equals(messageIdList));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/cursor.xml
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/cursor.xml
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/cursor.xml
new file mode 100644
index 0000000..d067a2c
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/cursor.xml
@@ -0,0 +1,67 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Copyright 2005-2006 The Apache Software Foundation
+  
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd";>
+
+  <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker persistent="false" xmlns="http://activemq.apache.org/schema/core";>
+
+    <!--  lets define the dispatch policy -->
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry topic="org.apache.>" producerFlowControl="false" 
memoryLimit="1mb">
+            
+            <deadLetterStrategy>
+              <individualDeadLetterStrategy  topicPrefix="Test.DLQ." />
+            </deadLetterStrategy>
+            <dispatchPolicy>
+              <strictOrderDispatchPolicy />
+            </dispatchPolicy>
+            <pendingSubscriberPolicy>
+               <vmCursor />
+            </pendingSubscriberPolicy>
+          </policyEntry>
+
+          <policyEntry queue="org.apache.>">
+            
+            <deadLetterStrategy>
+              <individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
+            </deadLetterStrategy>
+            <dispatchPolicy>
+              <strictOrderDispatchPolicy />
+            </dispatchPolicy>
+            <pendingQueuePolicy>
+               <vmQueueCursor />
+            </pendingQueuePolicy>
+          </policyEntry>
+
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+  </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/individual-dlq.xml
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/individual-dlq.xml
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/individual-dlq.xml
new file mode 100644
index 0000000..e79ea92
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/policy/individual-dlq.xml
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+  
+  http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- this file can only be parsed using the xbean-spring library -->
+<!-- START SNIPPET: xbean -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd";>
+
+  <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+  <broker persistent="false" xmlns="http://activemq.apache.org/schema/core";>
+
+    <!--  lets define the dispatch policy -->
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry topic="org.apache.>">
+            <deadLetterStrategy>
+              <individualDeadLetterStrategy  topicPrefix="Test.DLQ." 
processNonPersistent="true" />
+            </deadLetterStrategy>
+            <dispatchPolicy>
+              <strictOrderDispatchPolicy />
+            </dispatchPolicy>
+          </policyEntry>
+
+          <policyEntry queue="org.apache.>">
+            <deadLetterStrategy>
+              <individualDeadLetterStrategy queuePrefix="Test.DLQ." 
processNonPersistent="true"/>
+            </deadLetterStrategy>
+            <dispatchPolicy>
+              <strictOrderDispatchPolicy />
+            </dispatchPolicy>
+          </policyEntry>
+
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
+  </broker>
+
+</beans>
+<!-- END SNIPPET: xbean -->

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java
new file mode 100644
index 0000000..17bf2b2
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationGCTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+
+public class DestinationGCTest extends EmbeddedBrokerTestSupport {
+
+    ActiveMQQueue queue = new ActiveMQQueue("TEST");
+    ActiveMQQueue otherQueue = new ActiveMQQueue("TEST-OTHER");
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        broker.setDestinations(new ActiveMQDestination[] {queue});
+        broker.setSchedulePeriodForDestinationPurge(1000);
+        broker.setMaxPurgedDestinationsPerSweep(1);
+        PolicyEntry entry = new PolicyEntry();
+        entry.setGcInactiveDestinations(true);
+        entry.setInactiveTimoutBeforeGC(3000);
+        PolicyMap map = new PolicyMap();
+        map.setDefaultEntry(entry);
+        broker.setDestinationPolicy(map);
+        return broker;
+    }
+
+    public void testDestinationGCWithActiveConsumers() throws Exception {
+        assertEquals(1, broker.getAdminView().getQueues().length);
+
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost?create=false");
+        Connection connection = factory.createConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        session.createProducer(otherQueue).close();
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+
+            @Override
+            public void onMessage(Message message) {
+            }
+        });
+        connection.start();
+
+        TimeUnit.SECONDS.sleep(5);
+
+        assertTrue("After GC runs there should be one Queue.", 
Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker.getAdminView().getQueues().length == 1;
+            }
+        }));
+
+        connection.close();
+    }
+
+    public void testDestinationGc() throws Exception {
+        assertEquals(1, broker.getAdminView().getQueues().length);
+        assertTrue("After GC runs the Queue should be empty.", 
Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker.getAdminView().getQueues().length == 0;
+            }
+        }));
+    }
+
+    public void testDestinationGcLimit() throws Exception {
+
+        broker.getAdminView().addQueue("TEST1");
+        broker.getAdminView().addQueue("TEST2");
+        broker.getAdminView().addQueue("TEST3");
+        broker.getAdminView().addQueue("TEST4");
+
+        assertEquals(5, broker.getAdminView().getQueues().length);
+        Thread.sleep(7000);
+        int queues = broker.getAdminView().getQueues().length;
+        assertTrue(queues > 0 && queues < 5);
+        assertTrue("After GC runs the Queue should be empty.", 
Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker.getAdminView().getQueues().length == 0;
+            }
+        }));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationRemoveRestartTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationRemoveRestartTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationRemoveRestartTest.java
new file mode 100644
index 0000000..06011fe
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/DestinationRemoveRestartTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import junit.framework.Test;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+
+// from https://issues.apache.org/activemq/browse/AMQ-2216
+public class DestinationRemoveRestartTest extends CombinationTestSupport {
+    private final static String destinationName = "TEST";
+    public byte destinationType = ActiveMQDestination.QUEUE_TYPE;
+    BrokerService broker;
+
+    @Override
+    protected void setUp() throws Exception {
+        broker = createBroker();
+    }
+
+    private BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+        return broker;
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker.stop();
+    }
+
+    public void initCombosForTestCheckDestinationRemoveActionAfterRestart() {
+        addCombinationValues("destinationType", new 
Object[]{Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
+                Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)});
+    }
+    
+    public void testCheckDestinationRemoveActionAfterRestart() throws 
Exception {
+        doAddDestination();
+        doRemoveDestination();
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = createBroker();
+        doCheckRemoveActionAfterRestart();
+    }
+
+    public void doAddDestination() throws Exception {
+        boolean res = false;
+        
+        ActiveMQDestination amqDestination = 
+            ActiveMQDestination.createDestination(destinationName, 
destinationType);
+        
broker.getRegionBroker().addDestination(broker.getAdminConnectionContext(), 
amqDestination,true);
+        
+        final ActiveMQDestination[] list = 
broker.getRegionBroker().getDestinations();
+        for (final ActiveMQDestination element : list) {
+            final Destination destination = broker.getDestination(element);
+            if 
(destination.getActiveMQDestination().getPhysicalName().equals(destinationName))
 {                  
+                res = true;
+                break;
+            }
+        }
+        
+        assertTrue("Adding destination Failed", res);        
+    }
+    
+    public void doRemoveDestination() throws Exception {
+        boolean res = true;
+        
+        
broker.removeDestination(ActiveMQDestination.createDestination(destinationName, 
destinationType));
+        final ActiveMQDestination[] list = 
broker.getRegionBroker().getDestinations();
+        for (final ActiveMQDestination element : list) {
+            final Destination destination = broker.getDestination(element);
+            if 
(destination.getActiveMQDestination().getPhysicalName().equals(destinationName))
 {                  
+                res = false;
+                break;
+            }
+        }
+        
+        assertTrue("Removing destination Failed", res);      
+    }
+    
+    
+    public void doCheckRemoveActionAfterRestart() throws Exception {
+        boolean res = true;
+        
+        final ActiveMQDestination[] list = 
broker.getRegionBroker().getDestinations();
+        for (final ActiveMQDestination element : list) {
+            final Destination destination = broker.getDestination(element);
+            if 
(destination.getActiveMQDestination().getPhysicalName().equals(destinationName))
 {                  
+                res = false;
+                break;
+            }
+        }
+        
+        assertTrue("The removed destination is reloaded after restart !", res);
+    }
+    
+    public static Test suite() {
+        return suite(DestinationRemoveRestartTest.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
new file mode 100644
index 0000000..f69c380
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.InvalidSelectorException;
+import javax.management.ObjectName;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatchNotification;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.management.CountStatisticImpl;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class QueueDuplicatesFromStoreTest extends TestCase {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(QueueDuplicatesFromStoreTest.class);
+
+    ActiveMQQueue destination = new ActiveMQQueue("queue-"
+            + QueueDuplicatesFromStoreTest.class.getSimpleName());
+    BrokerService brokerService;
+
+    final static String mesageIdRoot = "11111:22222:";
+    final int messageBytesSize = 256;
+    final String text = new String(new byte[messageBytesSize]);
+
+    final int ackStartIndex = 100;
+    final int ackWindow = 50;
+    final int ackBatchSize = 50;
+    final int fullWindow = 200;
+    protected int count = 5000;
+
+    @Override
+    public void setUp() throws Exception {
+        brokerService = createBroker();
+        brokerService.setUseJmx(false);
+        brokerService.deleteAllMessages();
+        brokerService.start();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return new BrokerService();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        brokerService.stop();
+    }
+
+    public void testNoDuplicateAfterCacheFullAndAckedWithLargeAuditDepth() 
throws Exception {
+        doTestNoDuplicateAfterCacheFullAndAcked(1024*10);
+    }
+
+    public void testNoDuplicateAfterCacheFullAndAckedWithSmallAuditDepth() 
throws Exception {
+        doTestNoDuplicateAfterCacheFullAndAcked(512);
+    }
+
+    public void doTestNoDuplicateAfterCacheFullAndAcked(final int auditDepth) 
throws Exception {
+        final PersistenceAdapter persistenceAdapter =  
brokerService.getPersistenceAdapter();
+        final MessageStore queueMessageStore =
+            persistenceAdapter.createQueueMessageStore(destination);
+        final ConnectionContext contextNotInTx = new ConnectionContext();
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
+        consumerInfo.setExclusive(true);
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, 
brokerService.getTaskRunnerFactory());
+
+        // a workaround for this issue
+        // queue.setUseCache(false);
+        queue.systemUsage.getMemoryUsage().setLimit(1024 * 1024 * 10);
+        queue.setMaxAuditDepth(auditDepth);
+        queue.initialize();
+        queue.start();
+
+
+        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+        ProducerInfo producerInfo = new ProducerInfo();
+        ProducerState producerState = new ProducerState(producerInfo);
+        producerExchange.setProducerState(producerState);
+        producerExchange.setConnectionContext(contextNotInTx);
+
+        final CountDownLatch receivedLatch = new CountDownLatch(count);
+        final AtomicLong ackedCount = new AtomicLong(0);
+        final AtomicLong enqueueCounter = new AtomicLong(0);
+        final Vector<String> errors = new Vector<String>();
+
+        // populate the queue store, exceed memory limit so that cache is 
disabled
+        for (int i = 0; i < count; i++) {
+            Message message = getMessage(i);
+            queue.send(producerExchange, message);
+        }
+
+        assertEquals("store count is correct", count, 
queueMessageStore.getMessageCount());
+
+        // pull from store in small windows
+        Subscription subscription = new Subscription() {
+
+            @Override
+            public void add(MessageReference node) throws Exception {
+                if (enqueueCounter.get() != 
node.getMessageId().getProducerSequenceId()) {
+                    errors.add("Not in sequence at: " + enqueueCounter.get() + 
", received: "
+                            + node.getMessageId().getProducerSequenceId());
+                }
+                assertEquals("is in order", enqueueCounter.get(), node
+                        .getMessageId().getProducerSequenceId());
+                receivedLatch.countDown();
+                enqueueCounter.incrementAndGet();
+                node.decrementReferenceCount();
+            }
+
+            @Override
+            public void add(ConnectionContext context, Destination destination)
+                    throws Exception {
+            }
+
+            @Override
+            public int countBeforeFull() {
+                if (isFull()) {
+                    return 0;
+                } else {
+                    return fullWindow - (int) (enqueueCounter.get() - 
ackedCount.get());
+                }
+            }
+
+            @Override
+            public void destroy() {
+            };
+
+            @Override
+            public void gc() {
+            }
+
+            @Override
+            public ConsumerInfo getConsumerInfo() {
+                return consumerInfo;
+            }
+
+            @Override
+            public ConnectionContext getContext() {
+                return null;
+            }
+
+            @Override
+            public long getDequeueCounter() {
+                return 0;
+            }
+
+            @Override
+            public long getDispatchedCounter() {
+                return 0;
+            }
+
+            @Override
+            public int getDispatchedQueueSize() {
+                return 0;
+            }
+
+            @Override
+            public long getEnqueueCounter() {
+                return 0;
+            }
+
+            @Override
+            public int getInFlightSize() {
+                return 0;
+            }
+
+            @Override
+            public int getInFlightUsage() {
+                return 0;
+            }
+
+            @Override
+            public ObjectName getObjectName() {
+                return null;
+            }
+
+            @Override
+            public int getPendingQueueSize() {
+                return 0;
+            }
+
+            @Override
+            public int getPrefetchSize() {
+                return 0;
+            }
+
+            @Override
+            public String getSelector() {
+                return null;
+            }
+
+            @Override
+            public boolean isBrowser() {
+                return false;
+            }
+
+            @Override
+            public boolean isFull() {
+                return (enqueueCounter.get() - ackedCount.get()) >= fullWindow;
+            }
+
+            @Override
+            public boolean isHighWaterMark() {
+                return false;
+            }
+
+            @Override
+            public boolean isLowWaterMark() {
+                return false;
+            }
+
+            @Override
+            public boolean isRecoveryRequired() {
+                return false;
+            }
+
+            @Override
+            public boolean matches(MessageReference node,
+                    MessageEvaluationContext context) throws IOException {
+                return true;
+            }
+
+            @Override
+            public boolean matches(ActiveMQDestination destination) {
+                return true;
+            }
+
+            @Override
+            public void processMessageDispatchNotification(
+                    MessageDispatchNotification mdn) throws Exception {
+            }
+
+            @Override
+            public Response pullMessage(ConnectionContext context,
+                    MessagePull pull) throws Exception {
+                return null;
+            }
+
+            @Override
+            public boolean isWildcard() {
+                return false;
+            }
+
+            @Override
+            public List<MessageReference> remove(ConnectionContext context,
+                    Destination destination) throws Exception {
+                return null;
+            }
+
+            @Override
+            public void setObjectName(ObjectName objectName) {
+            }
+
+            @Override
+            public void setSelector(String selector)
+                    throws InvalidSelectorException,
+                    UnsupportedOperationException {
+            }
+
+            @Override
+            public void updateConsumerPrefetch(int newPrefetch) {
+            }
+
+            @Override
+            public boolean addRecoveredMessage(ConnectionContext context,
+                    MessageReference message) throws Exception {
+                return false;
+            }
+
+            @Override
+            public ActiveMQDestination getActiveMQDestination() {
+                return destination;
+            }
+
+            @Override
+            public void acknowledge(ConnectionContext context, MessageAck ack)
+                    throws Exception {
+            }
+
+            @Override
+            public int getCursorMemoryHighWaterMark(){
+                return 0;
+            }
+
+            @Override
+            public void setCursorMemoryHighWaterMark(
+                    int cursorMemoryHighWaterMark) {
+            }
+
+            @Override
+            public boolean isSlowConsumer() {
+                return false;
+            }
+
+            @Override
+            public void unmatched(MessageReference node) throws IOException {
+            }
+
+            @Override
+            public long getTimeOfLastMessageAck() {
+                return 0;
+            }
+
+            @Override
+            public long getConsumedCount() {
+                return 0;
+            }
+
+            public void incrementConsumedCount(){
+
+            }
+
+            public void resetConsumedCount(){
+
+            }
+        };
+
+        queue.addSubscription(contextNotInTx, subscription);
+        int removeIndex = 0;
+        do {
+            // Simulate periodic acks in small but recent windows
+            long receivedCount = enqueueCounter.get();
+            if (receivedCount > ackStartIndex) {
+                if (receivedCount >= removeIndex + ackWindow) {
+                    for (int j = 0; j < ackBatchSize; j++, removeIndex++) {
+                        ackedCount.incrementAndGet();
+                        MessageAck ack = new MessageAck();
+                        ack.setLastMessageId(new MessageId(mesageIdRoot
+                                + removeIndex));
+                        ack.setMessageCount(1);
+                        queue.removeMessage(contextNotInTx, subscription,
+                                new IndirectMessageReference(
+                                        getMessage(removeIndex)), ack);
+                        queue.wakeup();
+
+                    }
+                    if (removeIndex % 1000 == 0) {
+                        LOG.info("acked: " + removeIndex);
+                        persistenceAdapter.checkpoint(true);
+                    }
+                }
+            }
+
+        } while (!receivedLatch.await(0, TimeUnit.MILLISECONDS) && 
errors.isEmpty());
+
+        assertTrue("There are no errors: " + errors, errors.isEmpty());
+        assertEquals(count, enqueueCounter.get());
+        assertEquals("store count is correct", count - removeIndex,
+                queueMessageStore.getMessageCount());
+    }
+
+    private Message getMessage(int i) throws Exception {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setMessageId(new MessageId(mesageIdRoot + i));
+        message.setDestination(destination);
+        message.setPersistent(true);
+        message.setResponseRequired(true);
+        message.setText("Msg:" + i + " " + text);
+        assertEquals(message.getMessageId().getProducerSequenceId(), i);
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueOptimizedDispatchExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueOptimizedDispatchExceptionTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueOptimizedDispatchExceptionTest.java
new file mode 100644
index 0000000..b46169e
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueOptimizedDispatchExceptionTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.Connection;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.Connector;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.usage.MemoryUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueueOptimizedDispatchExceptionTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(QueueOptimizedDispatchExceptionTest.class);
+
+    private static final String brokerName = "testBroker";
+    private static final String brokerUrl = "vm://" + brokerName;
+    private static final int count = 50;
+
+    private final static String mesageIdRoot = "11111:22222:";
+    private final ActiveMQQueue destination = new ActiveMQQueue("queue-"
+            + QueueOptimizedDispatchExceptionTest.class.getSimpleName());
+    private final int messageBytesSize = 256;
+    private final String text = new String(new byte[messageBytesSize]);
+
+    private BrokerService broker;
+
+    @Before
+    public void setUp() throws Exception {
+
+        // Setup and start the broker
+        broker = new BrokerService();
+        broker.setBrokerName(brokerName);
+        broker.setPersistent(false);
+        broker.setSchedulerSupport(false);
+        broker.setUseJmx(false);
+        broker.setUseShutdownHook(false);
+        broker.addConnector(brokerUrl);
+
+        // Start the broker
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    private class MockMemoryUsage extends MemoryUsage {
+
+        private boolean full = true;
+
+        public void setFull(boolean full) {
+            this.full = full;
+        }
+
+        @Override
+        public boolean isFull() {
+            return full;
+        }
+    }
+
+    @Test
+    public void TestOptimizedDispatchCME() throws Exception {
+        final PersistenceAdapter persistenceAdapter = 
broker.getPersistenceAdapter();
+        final MessageStore queueMessageStore =
+            persistenceAdapter.createQueueMessageStore(destination);
+        final ConnectionContext contextNotInTx = new ConnectionContext();
+        contextNotInTx.setConnection(new Connection() {
+
+            @Override
+            public void stop() throws Exception {
+            }
+
+            @Override
+            public void start() throws Exception {
+            }
+
+            @Override
+            public void updateClient(ConnectionControl control) {
+            }
+
+            @Override
+            public void serviceExceptionAsync(IOException e) {
+            }
+
+            @Override
+            public void serviceException(Throwable error) {
+            }
+
+            @Override
+            public Response service(Command command) {
+                return null;
+            }
+
+            @Override
+            public boolean isSlow() {
+                return false;
+            }
+
+            @Override
+            public boolean isNetworkConnection() {
+                return false;
+            }
+
+            @Override
+            public boolean isManageable() {
+                return false;
+            }
+
+            @Override
+            public boolean isFaultTolerantConnection() {
+                return false;
+            }
+
+            @Override
+            public boolean isConnected() {
+                return true;
+            }
+
+            @Override
+            public boolean isBlocked() {
+                return false;
+            }
+
+            @Override
+            public boolean isActive() {
+                return false;
+            }
+
+            @Override
+            public ConnectionStatistics getStatistics() {
+                return null;
+            }
+
+            @Override
+            public String getRemoteAddress() {
+                return null;
+            }
+
+            @Override
+            public int getDispatchQueueSize() {
+                return 0;
+            }
+
+            @Override
+            public Connector getConnector() {
+                // TODO Auto-generated method stub
+                return null;
+            }
+
+            @Override
+            public String getConnectionId() {
+                return null;
+            }
+
+            @Override
+            public void dispatchSync(Command message) {
+            }
+
+            @Override
+            public void dispatchAsync(Command command) {
+            }
+
+            @Override
+            public int getActiveTransactionCount() {
+                return 0;
+            }
+
+            @Override
+            public Long getOldestActiveTransactionDuration() {
+                return null;
+            }
+        });
+
+        final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
+        final Queue queue = new Queue(broker, destination,
+                queueMessageStore, destinationStatistics, 
broker.getTaskRunnerFactory());
+
+        final MockMemoryUsage usage = new MockMemoryUsage();
+
+        queue.setOptimizedDispatch(true);
+        queue.initialize();
+        queue.start();
+        queue.memoryUsage = usage;
+
+        ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
+        ProducerInfo producerInfo = new ProducerInfo();
+        ProducerState producerState = new ProducerState(producerInfo);
+        producerExchange.setProducerState(producerState);
+        producerExchange.setConnectionContext(contextNotInTx);
+
+        // populate the queue store, exceed memory limit so that cache is 
disabled
+        for (int i = 0; i < count; i++) {
+            Message message = getMessage(i);
+            queue.send(producerExchange, message);
+        }
+
+        usage.setFull(false);
+
+        try {
+            queue.wakeup();
+        } catch(Exception e) {
+            LOG.error("Queue threw an unexpected exception: " + e.toString());
+            fail("Should not throw an exception.");
+        }
+    }
+
+    private Message getMessage(int i) throws Exception {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        message.setMessageId(new MessageId(mesageIdRoot + i));
+        message.setDestination(destination);
+        message.setPersistent(false);
+        message.setResponseRequired(true);
+        message.setText("Msg:" + i + " " + text);
+        assertEquals(message.getMessageId().getProducerSequenceId(), i);
+        return message;
+    }
+}

Reply via email to