http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
new file mode 100644
index 0000000..6c40239
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.util.IdGenerator;
+import org.junit.Test;
+
+public class PrioritizedPendingListTest {
+
+    @Test
+    public void testAddMessageFirst() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = list.size();
+        while (iter.hasNext()) {
+            assertEquals(lastId--, 
iter.next().getMessageId().getProducerSequenceId());
+        }
+    }
+
+    @Test
+    public void testAddMessageLast() {
+
+        PrioritizedPendingList list = new PrioritizedPendingList();
+
+        list.addMessageLast(new TestMessageReference(1));
+        list.addMessageLast(new TestMessageReference(2));
+        list.addMessageLast(new TestMessageReference(3));
+        list.addMessageLast(new TestMessageReference(4));
+        list.addMessageLast(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = 1;
+        while (iter.hasNext()) {
+            assertEquals(lastId++, 
iter.next().getMessageId().getProducerSequenceId());
+        }
+    }
+
+    @Test
+    public void testClear() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertFalse(list.isEmpty());
+        assertTrue(list.size() == 5);
+
+        list.clear();
+
+        assertTrue(list.isEmpty());
+        assertTrue(list.size() == 0);
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageLast(new TestMessageReference(2));
+        list.addMessageLast(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageLast(new TestMessageReference(5));
+
+        assertFalse(list.isEmpty());
+        assertTrue(list.size() == 5);
+    }
+
+    @Test
+    public void testIsEmpty() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+        assertTrue(list.isEmpty());
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertFalse(list.isEmpty());
+        list.clear();
+        assertTrue(list.isEmpty());
+    }
+
+    @Test
+    public void testRemove() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+
+        TestMessageReference toRemove = new TestMessageReference(6);
+
+        list.addMessageFirst(new TestMessageReference(1));
+        list.addMessageFirst(new TestMessageReference(2));
+        list.addMessageFirst(new TestMessageReference(3));
+        list.addMessageFirst(new TestMessageReference(4));
+        list.addMessageFirst(new TestMessageReference(5));
+
+        assertTrue(list.size() == 5);
+
+        list.addMessageLast(toRemove);
+        list.remove(toRemove);
+
+        assertTrue(list.size() == 5);
+
+        list.remove(toRemove);
+
+        assertTrue(list.size() == 5);
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = list.size();
+        while (iter.hasNext()) {
+            assertEquals(lastId--, 
iter.next().getMessageId().getProducerSequenceId());
+        }
+
+        list.remove(null);
+    }
+
+    @Test
+    public void testSize() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+        assertTrue(list.isEmpty());
+
+        assertTrue(list.size() == 0);
+        list.addMessageFirst(new TestMessageReference(1));
+        assertTrue(list.size() == 1);
+        list.addMessageLast(new TestMessageReference(2));
+        assertTrue(list.size() == 2);
+        list.addMessageFirst(new TestMessageReference(3));
+        assertTrue(list.size() == 3);
+        list.addMessageLast(new TestMessageReference(4));
+        assertTrue(list.size() == 4);
+        list.addMessageFirst(new TestMessageReference(5));
+        assertTrue(list.size() == 5);
+
+        assertFalse(list.isEmpty());
+        list.clear();
+        assertTrue(list.isEmpty());
+        assertTrue(list.size() == 0);
+    }
+
+    @Test
+    public void testPrioritization() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+
+        list.addMessageFirst(new TestMessageReference(1, 5));
+        list.addMessageFirst(new TestMessageReference(2, 4));
+        list.addMessageFirst(new TestMessageReference(3, 3));
+        list.addMessageFirst(new TestMessageReference(4, 2));
+        list.addMessageFirst(new TestMessageReference(5, 1));
+
+        assertTrue(list.size() == 5);
+
+        Iterator<MessageReference> iter = list.iterator();
+        int lastId = list.size();
+        while (iter.hasNext()) {
+            assertEquals(lastId--, iter.next().getMessage().getPriority());
+        }
+    }
+
+    static class TestMessageReference implements MessageReference {
+
+        private static final IdGenerator id = new IdGenerator();
+
+        private Message message;
+        private MessageId messageId;
+        private int referenceCount = 0;
+
+        public TestMessageReference(int sequenceId) {
+            messageId = new MessageId(id.generateId() + ":1", sequenceId);
+            message = new ActiveMQMessage();
+            message.setPriority((byte) javax.jms.Message.DEFAULT_PRIORITY);
+        }
+
+        public TestMessageReference(int sequenceId, int priority) {
+            messageId = new MessageId(id.generateId() + ":1", sequenceId);
+            message = new ActiveMQMessage();
+            message.setPriority((byte) priority);
+        }
+
+        @Override
+        public MessageId getMessageId() {
+            return messageId;
+        }
+
+        @Override
+        public Message getMessageHardRef() {
+            return null;
+        }
+
+        @Override
+        public Message getMessage() {
+            return message;
+        }
+
+        @Override
+        public boolean isPersistent() {
+            return false;
+        }
+
+        @Override
+        public Destination getRegionDestination() {
+            return null;
+        }
+
+        @Override
+        public int getRedeliveryCounter() {
+            return 0;
+        }
+
+        @Override
+        public void incrementRedeliveryCounter() {
+        }
+
+        @Override
+        public int getReferenceCount() {
+            return this.referenceCount;
+        }
+
+        @Override
+        public int incrementReferenceCount() {
+            return this.referenceCount++;
+        }
+
+        @Override
+        public int decrementReferenceCount() {
+            return this.referenceCount--;
+        }
+
+        @Override
+        public ConsumerId getTargetConsumerId() {
+            return null;
+        }
+
+        @Override
+        public int getSize() {
+            return 1;
+        }
+
+        @Override
+        public long getExpiration() {
+            return 0;
+        }
+
+        @Override
+        public String getGroupID() {
+            return null;
+        }
+
+        @Override
+        public int getGroupSequence() {
+            return 0;
+        }
+
+        @Override
+        public boolean isExpired() {
+            return false;
+        }
+
+        @Override
+        public boolean isDropped() {
+            return false;
+        }
+
+        @Override
+        public boolean isAdvisory() {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java
new file mode 100644
index 0000000..a330723
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreBasedCursorTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+/**
+ * A StoreBasedCursorTest
+ *
+ */
+
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.usage.SystemUsage;
+
+public class StoreBasedCursorTest extends TestCase {
+    protected String bindAddress = "tcp://localhost:60706";
+    BrokerService broker;
+    ActiveMQConnectionFactory factory;
+    Connection connection;
+    Session session;
+    Queue queue;
+    int messageSize = 1024;
+    // actual message is messageSize*2, and 4*MessageSize would allow 2 
messages be delivered, but the flush of the cache is async so the flush
+    // triggered on 2nd message maxing out the usage may not be in effect for 
the 3rd message to succeed. Making the memory usage more lenient
+    // gives the usageChange listener in the cursor an opportunity to kick in.
+    int memoryLimit = 12 * messageSize;
+    
+    protected void setUp() throws Exception {
+        super.setUp();
+        if (broker == null) {
+            broker = new BrokerService();
+            broker.setAdvisorySupport(false);
+        }
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+            broker = null;
+        }
+    }
+    
+    protected void start() throws Exception {
+        broker.start();
+        factory = new 
ActiveMQConnectionFactory("vm://localhost?jms.alwaysSyncSend=true");
+        factory.setWatchTopicAdvisories(false);
+        connection = factory.createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        queue = session.createQueue("QUEUE." + this.getClass().getName());
+    }
+    
+    protected void stop() throws Exception {
+        session.close();
+        connection.close();
+        broker.stop();
+        broker = null;
+    }
+    
+    protected void configureBroker(long memoryLimit, long systemLimit) throws 
Exception {
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.addConnector(bindAddress);
+        broker.setPersistent(true);
+        
+        SystemUsage systemUsage = broker.getSystemUsage();
+        systemUsage.setSendFailIfNoSpace(true);
+        systemUsage.getMemoryUsage().setLimit(systemLimit);
+        
+        PolicyEntry policy = new PolicyEntry();
+        policy.setProducerFlowControl(true);
+        policy.setUseCache(true);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+    }
+    
+    protected String createMessageText(int index) {
+        StringBuffer buffer = new StringBuffer(messageSize);
+        buffer.append("Message: " + index + " sent at: " + new Date());
+        if (buffer.length() > messageSize) {
+            return buffer.substring(0, messageSize);
+        }
+        for (int i = buffer.length(); i < messageSize; i++) {
+            buffer.append(' ');
+        }
+        return buffer.toString();
+    }
+    
+    protected void sendMessages(int deliveryMode) throws Exception {
+        start();
+        MessageProducer producer = session.createProducer(queue);
+        producer.setDeliveryMode(deliveryMode);
+        int i =0;
+        try {
+            for (i = 0; i < 200; i++) {
+                TextMessage message = 
session.createTextMessage(createMessageText(i));
+                producer.send(message);
+            }
+        } catch (javax.jms.ResourceAllocationException e) {
+               e.printStackTrace();
+            fail(e.getMessage() + " num msgs = " + i + ". percentUsage = " + 
broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+        }
+        stop();
+    }
+    
+    // use QueueStorePrefetch
+    public void testTwoUsageEqualPersistent() throws Exception {
+        configureBroker(memoryLimit, memoryLimit);
+        sendMessages(DeliveryMode.PERSISTENT);
+    }
+    
+    public void testUseCachePersistent() throws Exception {
+        int limit = memoryLimit / 2;
+        configureBroker(limit, memoryLimit);
+        sendMessages(DeliveryMode.PERSISTENT);
+    }
+    
+    public void testMemoryUsageLowPersistent() throws Exception {
+        configureBroker(memoryLimit, 10 * memoryLimit);
+        sendMessages(DeliveryMode.PERSISTENT);
+    }
+    
+    // use FilePendingMessageCursor
+    public void testTwoUsageEqualNonPersistent() throws Exception {
+        configureBroker(memoryLimit, memoryLimit);
+        sendMessages(DeliveryMode.NON_PERSISTENT);
+    }
+    
+    public void testMemoryUsageLowNonPersistent() throws Exception {
+        configureBroker(memoryLimit, 10 * memoryLimit);
+        sendMessages(DeliveryMode.NON_PERSISTENT);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java
new file mode 100644
index 0000000..6b49c8d
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorJDBCNoDuplicateTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorJDBCNoDuplicateTest extends 
StoreQueueCursorNoDuplicateTest {
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter();
+        broker.setPersistenceAdapter(persistenceAdapter);
+        return broker;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorKahaDBNoDuplicateTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorKahaDBNoDuplicateTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorKahaDBNoDuplicateTest.java
new file mode 100644
index 0000000..11a98d6
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorKahaDBNoDuplicateTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.File;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorKahaDBNoDuplicateTest extends 
StoreQueueCursorNoDuplicateTest {
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        PersistenceAdapter persistenceAdapter = new KahaDBStore();
+        persistenceAdapter.setDirectory(new 
File("target/activemq-data/kahadb"));      
+        broker.setPersistenceAdapter(persistenceAdapter);
+        return broker;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorLevelDBNoDuplicateTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorLevelDBNoDuplicateTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorLevelDBNoDuplicateTest.java
new file mode 100644
index 0000000..d119d50
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorLevelDBNoDuplicateTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activeio.journal.active.JournalImpl;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.leveldb.LevelDBStore;
+import org.apache.activemq.store.journal.JournalPersistenceAdapter;
+
+import java.io.File;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorLevelDBNoDuplicateTest extends 
StoreQueueCursorNoDuplicateTest {
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        LevelDBStore store = new LevelDBStore();
+        store.setDirectory(new File("target/activemq-data/leveldb"));
+        broker.setPersistenceAdapter(store);
+        return broker;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java
new file mode 100644
index 0000000..a62748a
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorMemoryNoDuplicateTest.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.BrokerService;
+
+/**
+ * @author gtully
+ * @see https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorMemoryNoDuplicateTest extends 
StoreQueueCursorNoDuplicateTest {
+  
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = super.createBroker();
+        broker.setPersistent(false);
+        return broker;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
new file mode 100644
index 0000000..2406e88
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorNoDuplicateTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.usage.SystemUsage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author gtully
+ * https://issues.apache.org/activemq/browse/AMQ-2020
+ **/
+public class StoreQueueCursorNoDuplicateTest extends TestCase {
+    static final Logger LOG = 
LoggerFactory.getLogger(StoreQueueCursorNoDuplicateTest.class);
+            ActiveMQQueue destination = new ActiveMQQueue("queue-"
+            + StoreQueueCursorNoDuplicateTest.class.getSimpleName());
+    BrokerService brokerService;
+
+    final static String mesageIdRoot = "11111:22222:0:";
+    final int messageBytesSize = 1024;
+    final String text = new String(new byte[messageBytesSize]);
+
+    protected int count = 6;
+
+    @Override
+    public void setUp() throws Exception {
+        brokerService = createBroker();
+        brokerService.setUseJmx(false);
+        brokerService.deleteAllMessages();
+        brokerService.start();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return new BrokerService();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        brokerService.stop();
+    }
+
+    public void testNoDuplicateAfterCacheFullAndReadPast() throws Exception {
+        final PersistenceAdapter persistenceAdapter = brokerService
+                .getPersistenceAdapter();
+        final MessageStore queueMessageStore = persistenceAdapter
+                .createQueueMessageStore(destination);
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
+        consumerInfo.setExclusive(true);
+
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        queueMessageStore.start();
+        queueMessageStore.registerIndexListener(null);
+
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, 
brokerService.getBroker());
+        SystemUsage systemUsage = new SystemUsage();
+        // ensure memory limit is reached
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 2));
+        underTest.setSystemUsage(systemUsage);
+        underTest.setEnableAudit(false);
+        underTest.start();
+        assertTrue("cache enabled", underTest.isUseCache() && 
underTest.isCacheEnabled());
+
+        final ConnectionContext contextNotInTx = new ConnectionContext();
+        for (int i = 0; i < count; i++) {
+            ActiveMQTextMessage msg = getMessage(i);
+            msg.setMemoryUsage(systemUsage.getMemoryUsage());
+
+            queueMessageStore.addMessage(contextNotInTx, msg);
+            underTest.addMessageLast(msg);
+        }
+
+        assertTrue("cache is disabled as limit reached", 
!underTest.isCacheEnabled());
+        int dequeueCount = 0;
+
+        underTest.setMaxBatchSize(2);
+        underTest.reset();
+        while (underTest.hasNext() && dequeueCount < count) {
+            MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
+            underTest.remove();
+            LOG.info("Received message: {} with body: {}",
+                     ref.getMessageId(), 
((ActiveMQTextMessage)ref.getMessage()).getText());
+            assertEquals(dequeueCount++, 
ref.getMessageId().getProducerSequenceId());
+        }
+        underTest.release();
+        assertEquals(count, dequeueCount);
+    }
+
+    private ActiveMQTextMessage getMessage(int i) throws Exception {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        MessageId id = new MessageId(mesageIdRoot + i);
+        id.setBrokerSequenceId(i);
+        id.setProducerSequenceId(i);
+        message.setMessageId(id);
+        message.setDestination(destination);
+        message.setPersistent(true);
+        message.setResponseRequired(true);
+        message.setText("Msg:" + i + " " + text);
+        assertEquals(message.getMessageId().getProducerSequenceId(), i);
+        return message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
new file mode 100644
index 0000000..f8fab10
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/StoreQueueCursorOrderTest.java
@@ -0,0 +1,517 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.region.cursors;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.usage.SystemUsage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StoreQueueCursorOrderTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(StoreQueueCursorOrderTest.class);
+
+    ActiveMQQueue destination = new ActiveMQQueue("queue-"
+            + StoreQueueCursorOrderTest.class.getSimpleName());
+    BrokerService brokerService;
+
+    final static String mesageIdRoot = "11111:22222:0:";
+    final int messageBytesSize = 1024;
+    final String text = new String(new byte[messageBytesSize]);
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = createBroker();
+        brokerService.setUseJmx(false);
+        brokerService.deleteAllMessages();
+        brokerService.start();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return new BrokerService();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+    }
+
+    @Test
+    public void tesBlockedFuture() throws Exception {
+        final int count = 2;
+        final Message[] messages = new Message[count];
+        final TestMessageStore queueMessageStore = new 
TestMessageStore(messages, destination);
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
+        consumerInfo.setExclusive(true);
+
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        queueMessageStore.start();
+        queueMessageStore.registerIndexListener(null);
+
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, 
brokerService.getBroker());
+        SystemUsage systemUsage = new SystemUsage();
+        // ensure memory limit is reached
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
+        underTest.setSystemUsage(systemUsage);
+        underTest.setEnableAudit(false);
+        underTest.start();
+        assertTrue("cache enabled", underTest.isUseCache() && 
underTest.isCacheEnabled());
+
+        ActiveMQTextMessage msg = getMessage(0);
+        messages[1] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.setRecievedByDFBridge(true);
+        FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
+            @Override
+            public void run() {
+            }
+        }, 2l) {};
+        msg.getMessageId().setFutureOrSequenceLong(future);
+        underTest.addMessageLast(msg);
+
+        assertTrue("cache enabled", underTest.isUseCache() && 
underTest.isCacheEnabled());
+
+        // second message will flip the cache but will be stored before the 
future task
+        msg = getMessage(1);
+        messages[0] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(1l);
+        underTest.addMessageLast(msg);
+
+
+        assertTrue("cache is disabled as limit reached", 
!underTest.isCacheEnabled());
+        assertEquals("setBatch unset", 0l, queueMessageStore.batch.get());
+
+        int dequeueCount = 0;
+
+        underTest.setMaxBatchSize(2);
+        underTest.reset();
+        while (underTest.hasNext() && dequeueCount < count) {
+            MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
+            underTest.remove();
+            LOG.info("Received message: {} with body: {}",
+                     ref.getMessageId(), 
((ActiveMQTextMessage)ref.getMessage()).getText());
+            assertEquals(dequeueCount++, 
ref.getMessageId().getProducerSequenceId());
+        }
+        underTest.release();
+        assertEquals(count, dequeueCount);
+    }
+
+    @Test
+    public void testNoSetBatchWithUnOrderedFutureCurrentSync() throws 
Exception {
+        final int count = 2;
+        final Message[] messages = new Message[count];
+        final TestMessageStore queueMessageStore = new 
TestMessageStore(messages, destination);
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
+        consumerInfo.setExclusive(true);
+
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        queueMessageStore.start();
+        queueMessageStore.registerIndexListener(null);
+
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, 
brokerService.getBroker());
+        SystemUsage systemUsage = new SystemUsage();
+        // ensure memory limit is reached
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
+        underTest.setSystemUsage(systemUsage);
+        underTest.setEnableAudit(false);
+        underTest.start();
+        assertTrue("cache enabled", underTest.isUseCache() && 
underTest.isCacheEnabled());
+
+        ActiveMQTextMessage msg = getMessage(0);
+        messages[1] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.setRecievedByDFBridge(true);
+        final ActiveMQTextMessage msgRef = msg;
+        FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
+            @Override
+            public void run() {
+                msgRef.getMessageId().setFutureOrSequenceLong(1l);
+            }
+        }, 1l) {};
+        msg.getMessageId().setFutureOrSequenceLong(future);
+        Executors.newSingleThreadExecutor().submit(future);
+        underTest.addMessageLast(msg);
+
+        assertTrue("cache enabled", underTest.isUseCache() && 
underTest.isCacheEnabled());
+
+        // second message will flip the cache but will be stored before the 
future task
+        msg = getMessage(1);
+        messages[0] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(1l);
+        underTest.addMessageLast(msg);
+
+
+        assertTrue("cache is disabled as limit reached", 
!underTest.isCacheEnabled());
+        assertEquals("setBatch unset", 0l, queueMessageStore.batch.get());
+
+        int dequeueCount = 0;
+
+        underTest.setMaxBatchSize(2);
+        underTest.reset();
+        while (underTest.hasNext() && dequeueCount < count) {
+            MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
+            underTest.remove();
+            LOG.info("Received message: {} with body: {}",
+                     ref.getMessageId(), 
((ActiveMQTextMessage)ref.getMessage()).getText());
+            assertEquals(dequeueCount++, 
ref.getMessageId().getProducerSequenceId());
+        }
+        underTest.release();
+        assertEquals(count, dequeueCount);
+    }
+
+    @Test
+    public void testSetBatchWithOrderedFutureCurrentFuture() throws Exception {
+        final int count = 2;
+        final Message[] messages = new Message[count];
+        final TestMessageStore queueMessageStore = new 
TestMessageStore(messages, destination);
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
+        consumerInfo.setExclusive(true);
+
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        queueMessageStore.start();
+        queueMessageStore.registerIndexListener(null);
+
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, 
brokerService.getBroker());
+        SystemUsage systemUsage = new SystemUsage();
+        // ensure memory limit is reached
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * 1);
+        underTest.setSystemUsage(systemUsage);
+        underTest.setEnableAudit(false);
+        underTest.start();
+        assertTrue("cache enabled", underTest.isUseCache() && 
underTest.isCacheEnabled());
+
+        ActiveMQTextMessage msg = getMessage(0);
+        messages[0] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.setRecievedByDFBridge(true);
+        final ActiveMQTextMessage msgRef = msg;
+        FutureTask<Long> future = new FutureTask<Long>(new Runnable() {
+            @Override
+            public void run() {
+                msgRef.getMessageId().setFutureOrSequenceLong(0l);
+            }
+        }, 0l) {};
+        msg.getMessageId().setFutureOrSequenceLong(future);
+        Executors.newSingleThreadExecutor().submit(future);
+        underTest.addMessageLast(msg);
+
+        assertTrue("cache enabled", underTest.isUseCache() && 
underTest.isCacheEnabled());
+
+        // second message will flip the cache but will be stored before the 
future task
+        msg = getMessage(1);
+        messages[1] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.setRecievedByDFBridge(true);
+        final ActiveMQTextMessage msgRe2f = msg;
+        FutureTask<Long> future2 = new FutureTask<Long>(new Runnable() {
+            @Override
+            public void run() {
+                msgRe2f.getMessageId().setFutureOrSequenceLong(1l);
+            }
+        }, 1l) {};
+        msg.getMessageId().setFutureOrSequenceLong(future2);
+        Executors.newSingleThreadExecutor().submit(future2);
+        underTest.addMessageLast(msg);
+
+
+        assertTrue("cache is disabled as limit reached", 
!underTest.isCacheEnabled());
+        assertEquals("setBatch set", 1l, queueMessageStore.batch.get());
+
+        int dequeueCount = 0;
+
+        underTest.setMaxBatchSize(2);
+        underTest.reset();
+        while (underTest.hasNext() && dequeueCount < count) {
+            MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
+            underTest.remove();
+            LOG.info("Received message: {} with body: {}",
+                     ref.getMessageId(), 
((ActiveMQTextMessage)ref.getMessage()).getText());
+            assertEquals(dequeueCount++, 
ref.getMessageId().getProducerSequenceId());
+        }
+        underTest.release();
+        assertEquals(count, dequeueCount);
+    }
+
+    @Test
+    public void testSetBatchWithFuture() throws Exception {
+        final int count = 4;
+        final Message[] messages = new Message[count];
+        final TestMessageStore queueMessageStore = new 
TestMessageStore(messages, destination);
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
+        consumerInfo.setExclusive(true);
+
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        queueMessageStore.start();
+        queueMessageStore.registerIndexListener(null);
+
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, 
brokerService.getBroker());
+        SystemUsage systemUsage = new SystemUsage();
+        // ensure memory limit is reached
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * (count + 6));
+        underTest.setSystemUsage(systemUsage);
+        underTest.setEnableAudit(false);
+        underTest.start();
+        assertTrue("cache enabled", underTest.isUseCache() && 
underTest.isCacheEnabled());
+
+        ActiveMQTextMessage msg = getMessage(0);
+        messages[0] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.setRecievedByDFBridge(true);
+        final ActiveMQTextMessage msgRef = msg;
+        FutureTask<Long> future0 = new FutureTask<Long>(new Runnable() {
+            @Override
+            public void run() {
+                msgRef.getMessageId().setFutureOrSequenceLong(0l);
+            }
+        }, 0l) {};
+        msg.getMessageId().setFutureOrSequenceLong(future0);
+        underTest.addMessageLast(msg);
+        Executors.newSingleThreadExecutor().submit(future0);
+
+
+        msg = getMessage(1);
+        messages[3] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.setRecievedByDFBridge(true);
+        final ActiveMQTextMessage msgRef1 = msg;
+        FutureTask<Long> future1 = new FutureTask<Long>(new Runnable() {
+            @Override
+            public void run() {
+                msgRef1.getMessageId().setFutureOrSequenceLong(3l);
+            }
+        }, 3l) {};
+        msg.getMessageId().setFutureOrSequenceLong(future1);
+        underTest.addMessageLast(msg);
+
+
+        msg = getMessage(2);
+        messages[1] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(1l);
+        underTest.addMessageLast(msg);
+
+        assertTrue("cache enabled", underTest.isUseCache() && 
underTest.isCacheEnabled());
+
+        // out of order future
+        Executors.newSingleThreadExecutor().submit(future1);
+
+        // sync add to flip cache
+        msg = getMessage(3);
+        messages[2] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(3l);
+        underTest.addMessageLast(msg);
+
+
+        assertTrue("cache is disabled as limit reached", 
!underTest.isCacheEnabled());
+        assertEquals("setBatch set", 2l, queueMessageStore.batch.get());
+
+        int dequeueCount = 0;
+
+        underTest.setMaxBatchSize(count);
+        underTest.reset();
+        while (underTest.hasNext() && dequeueCount < count) {
+            MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
+            underTest.remove();
+            LOG.info("Received message: {} with body: {}",
+                     ref.getMessageId(), 
((ActiveMQTextMessage)ref.getMessage()).getText());
+            assertEquals(dequeueCount++, 
ref.getMessageId().getProducerSequenceId());
+        }
+        underTest.release();
+        assertEquals(count, dequeueCount);
+    }
+
+    @Test
+    public void testSetBatch() throws Exception {
+        final int count = 3;
+        final Message[] messages = new Message[count];
+        final TestMessageStore queueMessageStore = new 
TestMessageStore(messages, destination);
+        final ConsumerInfo consumerInfo = new ConsumerInfo();
+        final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
+        consumerInfo.setExclusive(true);
+
+        final Queue queue = new Queue(brokerService, destination,
+                queueMessageStore, destinationStatistics, null);
+
+        queueMessageStore.start();
+        queueMessageStore.registerIndexListener(null);
+
+        QueueStorePrefetch underTest = new QueueStorePrefetch(queue, 
brokerService.getBroker());
+        SystemUsage systemUsage = new SystemUsage();
+        // ensure memory limit is reached
+        systemUsage.getMemoryUsage().setLimit(messageBytesSize * 5);
+        underTest.setSystemUsage(systemUsage);
+        underTest.setEnableAudit(false);
+        underTest.start();
+        assertTrue("cache enabled", underTest.isUseCache() && 
underTest.isCacheEnabled());
+
+
+        ActiveMQTextMessage msg = getMessage(0);
+        messages[0] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(0l);
+        underTest.addMessageLast(msg);
+
+        msg = getMessage(1);
+        messages[1] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(1l);
+        underTest.addMessageLast(msg);
+
+        assertTrue("cache enabled", underTest.isUseCache() && 
underTest.isCacheEnabled());
+
+        msg = getMessage(2);
+        messages[2] = msg;
+        msg.setMemoryUsage(systemUsage.getMemoryUsage());
+        msg.getMessageId().setFutureOrSequenceLong(2l);
+        underTest.addMessageLast(msg);
+
+
+        assertTrue("cache is disabled as limit reached", 
!underTest.isCacheEnabled());
+        assertEquals("setBatch set", 2l, queueMessageStore.batch.get());
+
+        int dequeueCount = 0;
+
+        underTest.setMaxBatchSize(2);
+        underTest.reset();
+        while (underTest.hasNext() && dequeueCount < count) {
+            MessageReference ref = underTest.next();
+            ref.decrementReferenceCount();
+            underTest.remove();
+            LOG.info("Received message: {} with body: {}",
+                     ref.getMessageId(), 
((ActiveMQTextMessage)ref.getMessage()).getText());
+            assertEquals(dequeueCount++, 
ref.getMessageId().getProducerSequenceId());
+        }
+        underTest.release();
+        assertEquals(count, dequeueCount);
+    }
+
+    private ActiveMQTextMessage getMessage(int i) throws Exception {
+        ActiveMQTextMessage message = new ActiveMQTextMessage();
+        MessageId id = new MessageId(mesageIdRoot + i);
+        id.setBrokerSequenceId(i);
+        id.setProducerSequenceId(i);
+        message.setMessageId(id);
+        message.setDestination(destination);
+        message.setPersistent(true);
+        message.setResponseRequired(true);
+        message.setText("Msg:" + i + " " + text);
+        assertEquals(message.getMessageId().getProducerSequenceId(), i);
+        return message;
+    }
+
+    class TestMessageStore extends AbstractMessageStore {
+        final Message[] messages;
+        public AtomicLong batch = new AtomicLong();
+
+        public TestMessageStore(Message[] messages, ActiveMQDestination dest) {
+            super(dest);
+            this.messages = messages;
+        }
+
+        @Override
+        public void addMessage(ConnectionContext context, Message message) 
throws IOException {
+
+        }
+
+        @Override
+        public Message getMessage(MessageId identity) throws IOException {
+            return null;
+        }
+
+        @Override
+        public void removeMessage(ConnectionContext context, MessageAck ack) 
throws IOException {
+
+        }
+
+        @Override
+        public void removeAllMessages(ConnectionContext context) throws 
IOException {
+
+        }
+
+        @Override
+        public void recover(MessageRecoveryListener container) throws 
Exception {
+
+        }
+
+        @Override
+        public int getMessageCount() throws IOException {
+            return 0;
+        }
+
+        @Override
+        public void resetBatching() {
+
+        }
+        @Override
+        public void recoverNextMessages(int maxReturned, 
MessageRecoveryListener listener) throws Exception {
+            for (int i=batch.intValue();i<messages.length;i++) {
+                LOG.info("recovered index:" + i);
+                listener.recoverMessage(messages[i]);
+            }
+        }
+
+        @Override
+        public void setBatch(MessageId message) {
+            batch.set((Long)message.getFutureOrSequenceLong());
+            batch.incrementAndGet();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketTest.java
new file mode 100644
index 0000000..fc35be6
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupHashBucketTest.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.group;
+
+
+/**
+ *
+ * 
+ */
+public class MessageGroupHashBucketTest extends MessageGroupMapTest {
+
+    protected MessageGroupMap createMessageGroupMap() {
+        return new MessageGroupHashBucket(1024, 64);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupMapTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupMapTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupMapTest.java
new file mode 100644
index 0000000..ab02426
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupMapTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.group;
+
+import junit.framework.TestCase;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.SessionId;
+
+/**
+ * 
+ * 
+ */
+public class MessageGroupMapTest extends TestCase {
+
+    protected MessageGroupMap map;
+    private ConsumerId consumer1;
+    private ConsumerId consumer2;
+    private ConsumerId consumer3;
+    private long idCounter;
+
+    public void testSingleConsumerForManyBucks() throws Exception {
+        assertGet("1", null);
+
+        map.put("1", consumer1);
+        assertGet("1", consumer1);
+        map.put("2", consumer1);
+        assertGet("2", consumer1);
+        map.put("3", consumer1);
+        assertGet("3", consumer1);
+
+        MessageGroupSet set = map.removeConsumer(consumer1);
+        assertContains(set, "1");
+        assertContains(set, "2");
+        assertContains(set, "3");
+        assertGet("1", null);
+        assertGet("2", null);
+        assertGet("3", null);
+    }
+
+    public void testManyConsumers() throws Exception {
+        assertGet("1", null);
+
+        map.put("1", consumer1);
+        assertGet("1", consumer1);
+        map.put("2", consumer2);
+        assertGet("2", consumer2);
+        map.put("3", consumer3);
+        assertGet("3", consumer3);
+
+        MessageGroupSet set = map.removeConsumer(consumer1);
+        assertContains(set, "1");
+
+        assertGet("1", null);
+        map.put("1", consumer2);
+        assertGet("1", consumer2);
+
+        set = map.removeConsumer(consumer2);
+        assertContains(set, "1");
+        assertContains(set, "2");
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        map = createMessageGroupMap();
+        consumer1 = createConsumerId();
+        consumer2 = createConsumerId();
+        consumer3 = createConsumerId();
+    }
+
+    protected MessageGroupMap createMessageGroupMap() {
+        return new SimpleMessageGroupMap();
+    }
+
+    protected ConsumerId createConsumerId() {
+        ConnectionId connectionId = new ConnectionId("" + ++idCounter);
+        SessionId sessionId = new SessionId(connectionId, ++idCounter);
+        ConsumerId answer = new ConsumerId(sessionId, ++idCounter);
+        return answer;
+    }
+
+    protected void assertGet(String groupdId, ConsumerId expected) {
+        ConsumerId actual = map.get(groupdId);
+        assertEquals("Entry for groupId: " + groupdId, expected, actual);
+    }
+
+    protected void assertContains(MessageGroupSet set, String groupID) {
+        assertTrue("MessageGroup set: " + set + " does not contain groupID: " 
+ groupID, set.contains(groupID));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java
new file mode 100644
index 0000000..86741c9
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/group/MessageGroupTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.group;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageGroupTest extends JmsTestSupport {
+       
+        private static final Logger LOG = 
LoggerFactory.getLogger(CombinationTestSupport.class);
+
+    public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws 
Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup a first connection
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer1 = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the messages.
+        for (int i = 0; i < 4; i++) {          
+               TextMessage message = session.createTextMessage("message " + i);
+            message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+            message.setIntProperty("JMSXGroupSeq", i + 1);
+            LOG.info("sending message: " + message);
+            producer.send(message);
+        }
+
+        // All the messages should have been sent down connection 1.. just get
+        // the first 3
+        for (int i = 0; i < 3; i++) {
+            TextMessage m1 = (TextMessage)consumer1.receive(500);
+            assertNotNull("m1 is null for index: " + i, m1);
+            assertEquals(m1.getIntProperty("JMSXGroupSeq"), i + 1);
+        }
+        
+        // Setup a second connection
+        Connection connection1 = factory.createConnection(userName, password);
+        connection1.start();
+        Session session2 = connection1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createConsumer(destination);
+
+        // Close the first consumer.
+        consumer1.close();
+
+        // The last messages should now go the the second consumer.
+        for (int i = 0; i < 1; i++) {
+            TextMessage m1 = (TextMessage)consumer2.receive(500);
+            assertNotNull("m1 is null for index: " + i, m1);
+            assertEquals(m1.getIntProperty("JMSXGroupSeq"), 4 + i);
+        }
+
+        //assert that there are no other messages left for the consumer 2
+        Message m = consumer2.receive(100);
+        assertNull("consumer 2 has some messages left", m);
+    }  
+    
+    public void testAddingConsumer() throws Exception {
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup a first connection
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        
+        MessageProducer producer = session.createProducer(destination);
+        //MessageConsumer consumer = session.createConsumer(destination);
+        
+       TextMessage message = session.createTextMessage("message");
+        message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+        
+        LOG.info("sending message: " + message);
+        producer.send(message);
+        
+        MessageConsumer consumer = session.createConsumer(destination);
+        
+        TextMessage msg = (TextMessage)consumer.receive();
+        assertNotNull(msg);
+        boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer");
+        assertTrue(first);
+    }    
+    
+    public void testClosingMessageGroup() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQQueue("TEST");
+
+        // Setup a first connection
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer1 = session.createConsumer(destination);
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the messages.
+        for (int i = 0; i < 4; i++) {          
+               TextMessage message = session.createTextMessage("message " + i);
+            message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+            LOG.info("sending message: " + message);
+            producer.send(message);
+        }
+
+
+
+        // All the messages should have been sent down consumer1.. just get
+        // the first 3
+        for (int i = 0; i < 3; i++) {
+            TextMessage m1 = (TextMessage)consumer1.receive(500);
+            assertNotNull("m1 is null for index: " + i, m1);
+        }
+        
+        // Setup a second consumer
+        Connection connection1 = factory.createConnection(userName, password);
+        connection1.start();
+        Session session2 = connection1.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createConsumer(destination);
+        
+        //assert that there are no messages for the consumer 2
+        Message m = consumer2.receive(100);
+        assertNull("consumer 2 has some messages", m);
+
+        // Close the group
+       TextMessage message = session.createTextMessage("message " + 5);
+        message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+        message.setIntProperty("JMSXGroupSeq", -1);
+        LOG.info("sending message: " + message);
+        producer.send(message);
+        
+        //Send some more messages
+        for (int i = 0; i < 4; i++) {          
+               message = session.createTextMessage("message " + i);
+            message.setStringProperty("JMSXGroupID", "TEST-GROUP");
+            LOG.info("sending message: " + message);
+            producer.send(message);
+        }
+        
+        // Receive the fourth message
+        TextMessage m1 = (TextMessage)consumer1.receive(500);
+        assertNotNull("m1 is null for index: " + 4, m1);
+        
+        // Receive the closing message
+        m1 = (TextMessage)consumer1.receive(500);
+        assertNotNull("m1 is null for index: " + 5, m1);        
+        
+        //assert that there are no messages for the consumer 1
+        m = consumer1.receive(100);
+        assertNull("consumer 1 has some messages left: " + m, m);
+
+        // The messages should now go to the second consumer.
+        for (int i = 0; i < 4; i++) {
+            m1 = (TextMessage)consumer2.receive(500);
+            assertNotNull("m1 is null for index: " + i, m1);
+        }
+
+    }
+       
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
new file mode 100644
index 0000000..44ef670
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ScheduledMessage;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsCronSchedulerTest extends JobSchedulerTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JmsCronSchedulerTest.class);
+
+    @Test
+    public void testSimulatenousCron() throws Exception {
+
+        final int COUNT = 10;
+        final AtomicInteger count = new AtomicInteger();
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                count.incrementAndGet();
+                latch.countDown();
+                assertTrue(message instanceof TextMessage);
+                TextMessage tm = (TextMessage) message;
+                try {
+                    LOG.info("Received [{}] count: {} ", tm.getText(), 
count.get());
+                } catch (JMSException e) {
+                    LOG.error("Unexpected exception in onMessage", e);
+                    fail("Unexpected exception in onMessage: " + 
e.getMessage());
+                }
+            }
+        });
+
+        connection.start();
+        for (int i = 0; i < COUNT; i++) {
+            MessageProducer producer = session.createProducer(destination);
+            TextMessage message = session.createTextMessage("test msg "+ i);
+            message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* 
* * * *");
+            producer.send(message);
+            LOG.info("Message {} sent at {}", i, new Date().toString());
+            producer.close();
+            //wait a couple sec so cron start time is different for next 
message
+            Thread.sleep(2000);
+        }
+        SchedulerBroker sb = (SchedulerBroker) 
this.broker.getBroker().getAdaptor(SchedulerBroker.class);
+        JobScheduler js = sb.getJobScheduler();
+        List<Job> list = js.getAllJobs();
+        assertEquals(COUNT, list.size());
+        latch.await(2, TimeUnit.MINUTES);
+        //All should messages should have been received by now
+        assertEquals(COUNT, count.get());
+    }
+
+    @Test
+    public void testCronScheduleWithTtlSet() throws Exception {
+
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(destination);
+        connection.start();
+
+        MessageProducer producer = session.createProducer(destination);
+        producer.setTimeToLive(TimeUnit.MINUTES.toMillis(1));
+        TextMessage message = session.createTextMessage("test msg ");
+        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * 
* *");
+
+        producer.send(message);
+        producer.close();
+
+        Thread.sleep(TimeUnit.MINUTES.toMillis(2));
+
+        assertNotNull(consumer.receiveNoWait());
+        assertNull(consumer.receiveNoWait());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
new file mode 100644
index 0000000..0ce584d
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsSchedulerTest.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.util.ProducerThread;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+
+public class JmsSchedulerTest extends JobSchedulerTestSupport {
+
+    @Test
+    public void testCron() throws Exception {
+        final int COUNT = 10;
+        final AtomicInteger count = new AtomicInteger();
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+                count.incrementAndGet();
+            }
+        });
+
+        connection.start();
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+        long time = 1000;
+        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "* * * 
* *");
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, COUNT - 
1);
+
+        producer.send(message);
+        producer.close();
+
+        Thread.sleep(500);
+        SchedulerBroker sb = (SchedulerBroker) 
this.broker.getBroker().getAdaptor(SchedulerBroker.class);
+        JobScheduler js = sb.getJobScheduler();
+        List<Job> list = js.getAllJobs();
+        assertEquals(1, list.size());
+        latch.await(240, TimeUnit.SECONDS);
+        assertEquals(COUNT, count.get());
+    }
+
+    @Test
+    public void testSchedule() throws Exception {
+        final int COUNT = 1;
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        connection.start();
+        long time = 5000;
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+
+        producer.send(message);
+        producer.close();
+        // make sure the message isn't delivered early
+        Thread.sleep(2000);
+        assertEquals(latch.getCount(), COUNT);
+        latch.await(5, TimeUnit.SECONDS);
+        assertEquals(latch.getCount(), 0);
+    }
+
+    @Test
+    public void testTransactedSchedule() throws Exception {
+        final int COUNT = 1;
+        Connection connection = createConnection();
+
+        final Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    session.commit();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+                latch.countDown();
+            }
+        });
+
+        connection.start();
+        long time = 5000;
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+
+        producer.send(message);
+        session.commit();
+        producer.close();
+        // make sure the message isn't delivered early
+        Thread.sleep(2000);
+        assertEquals(latch.getCount(), COUNT);
+        latch.await(5, TimeUnit.SECONDS);
+        assertEquals(latch.getCount(), 0);
+    }
+
+    @Test
+    public void testScheduleRepeated() throws Exception {
+        final int NUMBER = 10;
+        final AtomicInteger count = new AtomicInteger();
+        Connection connection = createConnection();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final CountDownLatch latch = new CountDownLatch(NUMBER);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+                count.incrementAndGet();
+            }
+        });
+
+        connection.start();
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+        long time = 1000;
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 500);
+        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, NUMBER - 
1);
+        producer.send(message);
+        producer.close();
+        assertEquals(latch.getCount(), NUMBER);
+        latch.await(10, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+        // wait a little longer - make sure we only get NUMBER of replays
+        Thread.sleep(1000);
+        assertEquals(NUMBER, count.get());
+    }
+
+    @Test
+    public void testScheduleRestart() throws Exception {
+        // send a message
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test msg");
+        long time = 5000;
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        producer.send(message);
+        producer.close();
+
+        //restart broker
+        broker.stop();
+        broker.waitUntilStopped();
+
+        broker = createBroker(false);
+        broker.start();
+        broker.waitUntilStarted();
+
+
+        // consume the message
+        connection = createConnection();
+        connection.start();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(destination);
+        Message msg = consumer.receive(5000);
+        assertNotNull("Didn't receive the message", msg);
+
+        //send another message
+        producer = session.createProducer(destination);
+        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
+        producer.send(message);
+        producer.close();
+    }
+
+    @Test
+    public void testJobSchedulerStoreUsage() throws Exception {
+
+        // Shrink the store limit down so we get the producer to block
+        broker.getSystemUsage().getJobSchedulerUsage().setLimit(10 * 1024);
+
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory("vm://localhost");
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final long time = 5000;
+        final ProducerThread producer = new ProducerThread(sess, destination) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                Message message = super.createMessage(i);
+                message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 
time);
+                return message;
+            }
+        };
+        producer.setMessageCount(100);
+        producer.start();
+
+        MessageConsumer consumer = sess.createConsumer(destination);
+        final CountDownLatch latch = new CountDownLatch(100);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+            }
+        });
+
+        // wait for the producer to block, which should happen immediately, 
and also wait long
+        // enough for the delay to elapse.  We should see no deliveries as the 
send should block
+        // on the first message.
+        Thread.sleep(10000l);
+
+        assertEquals(100, latch.getCount());
+
+        // Increase the store limit so the producer unblocks.  Everything 
should enqueue at this point.
+        broker.getSystemUsage().getJobSchedulerUsage().setLimit(1024 * 1024 * 
33);
+
+        // Wait long enough that the messages are enqueued and the delivery 
delay has elapsed.
+        Thread.sleep(10000l);
+
+        // Make sure we sent all the messages we expected to send
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return producer.getSentCount() == producer.getMessageCount();
+            }
+        }, 20000l);
+
+        assertEquals("Producer didn't send all messages", 
producer.getMessageCount(), producer.getSentCount());
+
+        // Make sure we got all the messages we expected to get
+        latch.await(20000l, TimeUnit.MILLISECONDS);
+
+        assertEquals("Consumer did not receive all messages.", 0, 
latch.getCount());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java
new file mode 100644
index 0000000..641ee97
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JobSchedulerBrokerShutdownTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.scheduler;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.Session;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.ProducerThread;
+
+public class JobSchedulerBrokerShutdownTest extends EmbeddedBrokerTestSupport {
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        File schedulerDirectory = new File("target/scheduler");
+
+        IOHelper.mkdirs(schedulerDirectory);
+        IOHelper.deleteChildren(schedulerDirectory);
+
+        BrokerService broker = super.createBroker();
+        broker.setSchedulerSupport(true);
+        broker.setDataDirectory("target");
+        broker.setSchedulerDirectoryFile(schedulerDirectory);
+        broker.getSystemUsage().getStoreUsage().setLimit(1 * 512);
+        broker.deleteAllMessages();
+        return broker;
+    }
+
+    @Override
+    protected boolean isPersistent() {
+        return true;
+    }
+
+    public void testSchedule() throws Exception {
+
+        Connection connection = createConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        connection.start();
+        final long time = 1000;
+
+        ProducerThread producer = new ProducerThread(session, destination) {
+            @Override
+            protected Message createMessage(int i) throws Exception {
+                Message message = super.createMessage(i);
+                message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 
time);
+                return message;
+            }
+        };
+
+        producer.setMessageCount(200);
+        producer.setDaemon(true);
+
+        producer.start();
+
+        Thread.sleep(5000);
+    }
+}

Reply via email to