http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
new file mode 100644
index 0000000..b32c7ad
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ActiveMQSlowConsumerManualTest.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import 
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
+import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * @author James Furness
+ *         https://issues.apache.org/jira/browse/AMQ-3607
+ */
+public class ActiveMQSlowConsumerManualTest {
+    private static final int PORT = 12345;
+    private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
+    private static final String URL = "nio://localhost:" + PORT + 
"?socket.tcpNoDelay=true";
+
+    @Test(timeout = 60000)
+    public void testDefaultSettings() throws Exception {
+        runTest("testDefaultSettings", 30, -1, -1, false, false, false, false);
+    }
+
+    @Test(timeout = 60000)
+    public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception {
+        runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, 
false, false, true, false);
+    }
+
+    @Test(timeout = 60000)
+    public void testBounded() throws Exception {
+        runTest("testBounded", 30, 5, 25, false, false, false, false);
+    }
+
+    @Test(timeout = 60000)
+    public void testBoundedWithOptimiseAcknowledge() throws Exception {
+        runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 25, false, false, 
true, false);
+    }
+
+    public void runTest(String name, int sendMessageCount, int prefetchLimit, 
int messageLimit, boolean evictOldestMessage, boolean disableFlowControl, 
boolean optimizeAcknowledge, boolean persistent) throws Exception {
+        BrokerService broker = createBroker(persistent);
+        broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit, 
messageLimit, evictOldestMessage, disableFlowControl));
+        broker.start();
+
+        // Slow consumer
+        Session slowConsumerSession = buildSession("SlowConsumer", URL, 
optimizeAcknowledge);
+        final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
+        final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
+        final List<Integer> slowConsumerReceived = sendMessageCount <= 1000 ? 
new ArrayList<Integer>() : null;
+        MessageConsumer slowConsumer = createSubscriber(slowConsumerSession,
+                new MessageListener() {
+                    @Override
+                    public void onMessage(Message message) {
+                        try {
+                            slowConsumerReceiveCount.incrementAndGet();
+                            int count = Integer.parseInt(((TextMessage) 
message).getText());
+                            if (slowConsumerReceived != null) 
slowConsumerReceived.add(count);
+                            if (count % 10000 == 0) 
System.out.println("SlowConsumer: Receive " + count);
+                            blockSlowConsumer.await();
+                        } catch (Exception ignored) {
+                        }
+                    }
+                }
+        );
+
+        // Fast consumer
+        Session fastConsumerSession = buildSession("FastConsumer", URL, 
optimizeAcknowledge);
+        final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
+        final List<Integer> fastConsumerReceived = sendMessageCount <= 1000 ? 
new ArrayList<Integer>() : null;
+        MessageConsumer fastConsumer = createSubscriber(fastConsumerSession,
+                new MessageListener() {
+                    @Override
+                    public void onMessage(Message message) {
+                        try {
+                            fastConsumerReceiveCount.incrementAndGet();
+                            TimeUnit.MILLISECONDS.sleep(5);
+                            int count = Integer.parseInt(((TextMessage) 
message).getText());
+                            if (fastConsumerReceived != null) 
fastConsumerReceived.add(count);
+                            if (count % 10000 == 0) 
System.out.println("FastConsumer: Receive " + count);
+                        } catch (Exception ignored) {
+                        }
+                    }
+                }
+        );
+
+        // Wait for consumers to connect
+        Thread.sleep(500);
+
+        // Publisher
+        AtomicInteger sentCount = new AtomicInteger();
+        List<Integer> sent = sendMessageCount <= 1000 ? new 
ArrayList<Integer>() : null;
+        Session publisherSession = buildSession("Publisher", URL, 
optimizeAcknowledge);
+        MessageProducer publisher = createPublisher(publisherSession);
+        for (int i = 0; i < sendMessageCount; i++) {
+            sentCount.incrementAndGet();
+            if (sent != null) sent.add(i);
+            if (i % 10000 == 0) System.out.println("Publisher: Send " + i);
+            
publisher.send(publisherSession.createTextMessage(Integer.toString(i)));
+        }
+
+        // Wait for messages to arrive
+        Thread.sleep(500);
+
+        System.out.println(name + ": Publisher Sent: " + sentCount + " " + 
sent);
+        System.out.println(name + ": Whilst slow consumer blocked:");
+        System.out.println("\t\t- SlowConsumer Received: " + 
slowConsumerReceiveCount + " " + slowConsumerReceived);
+        System.out.println("\t\t- FastConsumer Received: " + 
fastConsumerReceiveCount + " " + fastConsumerReceived);
+
+        // Unblock slow consumer
+        blockSlowConsumer.countDown();
+
+        // Wait for messages to arrive
+        Thread.sleep(500);
+
+        System.out.println(name + ": After slow consumer unblocked:");
+        System.out.println("\t\t- SlowConsumer Received: " + 
slowConsumerReceiveCount + " " + slowConsumerReceived);
+        System.out.println("\t\t- FastConsumer Received: " + 
fastConsumerReceiveCount + " " + fastConsumerReceived);
+        System.out.println();
+
+        publisher.close();
+        publisherSession.close();
+        slowConsumer.close();
+        slowConsumerSession.close();
+        fastConsumer.close();
+        fastConsumerSession.close();
+        broker.stop();
+
+        Assert.assertEquals("Fast consumer missed messages whilst slow 
consumer was blocking", sent, fastConsumerReceived);
+        // this is too timine dependent  as sometimes there is message 
eviction, would need to check the dlq
+        //Assert.assertEquals("Slow consumer received incorrect message 
count", Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? 
messageLimit : Integer.MAX_VALUE)), slowConsumerReceived.size());
+    }
+
+    private static BrokerService createBroker(boolean persistent) throws 
Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("TestBroker");
+        broker.setPersistent(persistent);
+        broker.addConnector(URL);
+        return broker;
+    }
+
+    private static MessageConsumer createSubscriber(Session session, 
MessageListener messageListener) throws JMSException {
+        MessageConsumer consumer = session.createConsumer(TOPIC);
+        consumer.setMessageListener(messageListener);
+        return consumer;
+    }
+
+    private static MessageProducer createPublisher(Session session) throws 
JMSException {
+        MessageProducer producer = session.createProducer(TOPIC);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        return producer;
+    }
+
+    private static Session buildSession(String clientId, String url, boolean 
optimizeAcknowledge) throws JMSException {
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(url);
+
+        connectionFactory.setCopyMessageOnSend(false);
+        connectionFactory.setDisableTimeStampsByDefault(true);
+        connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);
+        if (optimizeAcknowledge) {
+            connectionFactory.setOptimizeAcknowledgeTimeOut(1);
+        }
+
+        Connection connection = connectionFactory.createConnection();
+        connection.setClientID(clientId);
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        connection.start();
+
+        return session;
+    }
+
+    private static PolicyMap buildPolicy(ActiveMQTopic topic, int 
prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean 
disableFlowControl) {
+        PolicyMap policyMap = new PolicyMap();
+
+        PolicyEntry policyEntry = new PolicyEntry();
+
+        if (evictOldestMessage) {
+            policyEntry.setMessageEvictionStrategy(new 
OldestMessageEvictionStrategy());
+        }
+
+        if (disableFlowControl) {
+            policyEntry.setProducerFlowControl(false);
+        }
+
+        if (prefetchLimit > 0) {
+            policyEntry.setTopicPrefetch(prefetchLimit);
+        }
+
+        if (messageLimit > 0) {
+            ConstantPendingMessageLimitStrategy messageLimitStrategy = new 
ConstantPendingMessageLimitStrategy();
+            messageLimitStrategy.setLimit(messageLimit);
+            policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy);
+        }
+
+        policyMap.put(topic, policyEntry);
+
+        return policyMap;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
new file mode 100644
index 0000000..8c580a9
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/ConnectionPerMessageTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.activemq.bugs;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConnectionPerMessageTest extends EmbeddedBrokerTestSupport {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ConnectionPerMessageTest.class);
+       private static final int COUNT = 2000;
+       protected String bindAddress;
+
+       public void testConnectionPerMessage() throws Exception {
+               final String topicName = "test.topic";
+
+               LOG.info("Initializing connection factory for JMS to URL: "
+                               + bindAddress);
+               final ActiveMQConnectionFactory normalFactory = new 
ActiveMQConnectionFactory();
+               normalFactory.setBrokerURL(bindAddress);
+               for (int i = 0; i < COUNT; i++) {
+
+                       if (i % 100 == 0) {
+                               LOG.info(new Integer(i).toString());
+                       }
+
+                       Connection conn = null;
+                       try {
+
+                               conn = normalFactory.createConnection();
+                               final Session session = 
conn.createSession(false,
+                                               Session.AUTO_ACKNOWLEDGE);
+                               final Topic topic = 
session.createTopic(topicName);
+                               final MessageProducer producer = 
session.createProducer(topic);
+                               
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+                               final MapMessage m = session.createMapMessage();
+                               m.setInt("hey", i);
+
+                               producer.send(m);
+
+                       } catch (JMSException e) {
+                               LOG.warn(e.getMessage(), e);
+                       } finally {
+                               if (conn != null)
+                                       try {
+                                               conn.close();
+                                       } catch (JMSException e) {
+                                               LOG.warn(e.getMessage(), e);
+                                       }
+                       }
+               }
+       }
+
+       protected void setUp() throws Exception {
+               bindAddress = "vm://localhost";
+               super.setUp();
+       }
+
+       protected BrokerService createBroker() throws Exception {
+               BrokerService answer = new BrokerService();
+        answer.setDeleteAllMessagesOnStartup(true);
+               answer.setUseJmx(false);
+               answer.setPersistent(isPersistent());
+               answer.addConnector(bindAddress);
+               return answer;
+       }
+
+       protected boolean isPersistent() {
+               return true;
+       }
+
+       protected void tearDown() throws Exception {
+               super.tearDown();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
new file mode 100644
index 0000000..f956da6
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class CraigsBugTest extends EmbeddedBrokerTestSupport {
+
+    private String connectionUri;
+
+    public void testConnectionFactory() throws Exception {
+        final ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory(connectionUri);
+        final ActiveMQQueue queue = new ActiveMQQueue("testqueue");
+        final Connection conn = cf.createConnection();
+
+        Runnable r = new Runnable() {
+            public void run() {
+                try {
+                    Session session = conn.createSession(false, 1);
+                    MessageConsumer consumer = session.createConsumer(queue, 
null);
+                    consumer.receive(1000);
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+        new Thread(r).start();
+        conn.start();
+
+        try {
+            synchronized (this) {
+                wait(3000);
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    protected void setUp() throws Exception {
+        bindAddress = "tcp://localhost:0";
+        super.setUp();
+
+        connectionUri = 
broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
new file mode 100644
index 0000000..bb66943
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DoubleExpireTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.TimeoutException;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.region.Queue;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.Assert;
+
+public class DoubleExpireTest extends EmbeddedBrokerTestSupport {
+
+       private static final long MESSAGE_TTL_MILLIS = 1000;
+       private static final long MAX_TEST_TIME_MILLIS = 60000;
+
+       public void setUp() throws Exception {
+               setAutoFail(true);
+               setMaxTestTime(MAX_TEST_TIME_MILLIS);
+               super.setUp();
+       }
+
+       /**
+        * This test verifies that a message that expires can be be resent to 
queue
+        * with a new expiration and that it will be processed as a new message 
and
+        * allowed to re-expire.
+        * <p>
+        * <b>NOTE:</b> This test fails on AMQ 5.4.2 because the 
originalExpiration
+        * timestamp is not cleared when the message is resent.
+        */
+       public void testDoubleExpireWithoutMove() throws Exception {
+               // Create the default dead letter queue.
+               final ActiveMQDestination DLQ = 
createDestination("ActiveMQ.DLQ");
+
+               Connection conn = createConnection();
+               try {
+                       conn.start();
+                       Session session = conn.createSession(false,
+                                       Session.AUTO_ACKNOWLEDGE);
+
+                       // Verify that the test queue and DLQ are empty.
+                       Assert.assertEquals(0, getSize(destination));
+                       Assert.assertEquals(0, getSize(DLQ));
+
+                       // Enqueue a message to the test queue that will expire 
after 1s.
+                       MessageProducer producer = 
session.createProducer(destination);
+                       Message testMessage = session.createTextMessage("test 
message");
+                       producer.send(testMessage, 
Message.DEFAULT_DELIVERY_MODE,
+                                       Message.DEFAULT_PRIORITY, 
MESSAGE_TTL_MILLIS);
+                       Assert.assertEquals(1, getSize(destination));
+
+                       // Wait for the message to expire.
+                       waitForSize(destination, 0, MAX_TEST_TIME_MILLIS);
+                       Assert.assertEquals(1, getSize(DLQ));
+
+                       // Consume the message from the DLQ and re-enqueue it 
to the test
+                       // queue so that it expires after 1s.
+                       MessageConsumer consumer = session.createConsumer(DLQ);
+                       Message expiredMessage = consumer.receive();
+                       Assert.assertEquals(testMessage.getJMSMessageID(), 
expiredMessage
+                                       .getJMSMessageID());
+
+                       producer.send(expiredMessage, 
Message.DEFAULT_DELIVERY_MODE,
+                                       Message.DEFAULT_PRIORITY, 
MESSAGE_TTL_MILLIS);
+                       Assert.assertEquals(1, getSize(destination));
+                       Assert.assertEquals(0, getSize(DLQ));
+
+                       // Verify that the resent message is "different" in 
that it has
+                       // another ID.
+                       Assert.assertNotSame(testMessage.getJMSMessageID(), 
expiredMessage
+                                       .getJMSMessageID());
+
+                       // Wait for the message to re-expire.
+                       waitForSize(destination, 0, MAX_TEST_TIME_MILLIS);
+                       Assert.assertEquals(1, getSize(DLQ));
+
+                       // Re-consume the message from the DLQ.
+                       Message reexpiredMessage = consumer.receive();
+                       Assert.assertEquals(expiredMessage.getJMSMessageID(), 
reexpiredMessage
+                                       .getJMSMessageID());
+               } finally {
+                       conn.close();
+               }
+       }
+
+       /**
+        * A helper method that returns the embedded broker's implementation of 
a
+        * JMS queue.
+        */
+       private Queue getPhysicalDestination(ActiveMQDestination destination)
+                       throws Exception {
+               return (Queue) 
broker.getAdminView().getBroker().getDestinationMap()
+                               .get(destination);
+       }
+
+       /**
+        * A helper method that returns the size of the specified queue/topic.
+        */
+       private long getSize(ActiveMQDestination destination) throws Exception {
+               return getPhysicalDestination(destination) != null ? 
getPhysicalDestination(
+                               
destination).getDestinationStatistics().getMessages()
+                               .getCount()
+                               : 0;
+       }
+
+       /**
+        * A helper method that waits for a destination to reach a certain size.
+        */
+       private void waitForSize(ActiveMQDestination destination, int size,
+                       long timeoutMillis) throws Exception, TimeoutException {
+               long startTimeMillis = System.currentTimeMillis();
+
+               while (getSize(destination) != size
+                               && System.currentTimeMillis() < 
(startTimeMillis + timeoutMillis)) {
+                       Thread.sleep(250);
+               }
+
+               if (getSize(destination) != size) {
+                       throw new TimeoutException("Destination "
+                                       + destination.getPhysicalName() + " did 
not reach size "
+                                       + size + " within " + timeoutMillis + 
"ms.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
new file mode 100644
index 0000000..eeee82b
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import javax.management.ObjectName;
+import junit.framework.Test;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  A Test case for AMQ-1479
+ */
+public class DurableConsumerTest extends CombinationTestSupport{
+    private static final Logger LOG = 
LoggerFactory.getLogger(DurableConsumerTest.class);
+    private static int COUNT = 1024;
+    private static String CONSUMER_NAME = "DURABLE_TEST";
+    protected BrokerService broker;
+    
+    protected String bindAddress = "tcp://localhost:61616";
+    
+    protected byte[] payload = new byte[1024 * 32];
+    protected ConnectionFactory factory;
+    protected Vector<Exception> exceptions = new Vector<Exception>();
+    
+    private static final String TOPIC_NAME = "failoverTopic";
+    private static final String CONNECTION_URL = 
"failover:(tcp://localhost:61616,tcp://localhost:61617)";
+    public boolean useDedicatedTaskRunner = false;
+    
+    private class SimpleTopicSubscriber implements 
MessageListener,ExceptionListener{
+        
+        private TopicConnection topicConnection = null;
+        
+        public SimpleTopicSubscriber(String connectionURL,String 
clientId,String topicName) {
+            
+            ActiveMQConnectionFactory topicConnectionFactory = null;
+            TopicSession topicSession = null;
+            Topic topic = null;
+            TopicSubscriber topicSubscriber = null;
+            
+            topicConnectionFactory = new 
ActiveMQConnectionFactory(connectionURL);
+            try {
+                
+                topic = new ActiveMQTopic(topicName);
+                topicConnection = 
topicConnectionFactory.createTopicConnection();
+                topicConnection.setClientID((clientId));
+                topicConnection.start();
+                
+                topicSession = topicConnection.createTopicSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                topicSubscriber = topicSession.createDurableSubscriber(topic, 
(clientId));
+                topicSubscriber.setMessageListener(this);
+                
+            } catch (JMSException e) {
+                e.printStackTrace();
+            }
+        }
+        
+        public void onMessage(Message arg0){
+        }
+        
+        public void closeConnection(){
+            if (topicConnection != null) {
+                try {
+                    topicConnection.close();
+                } catch (JMSException e) {
+                }
+            }
+        }
+        
+        public void onException(JMSException exception){
+            exceptions.add(exception);
+        }
+    }
+    
+    private class MessagePublisher implements Runnable{
+        private final boolean shouldPublish = true;
+        
+        public void run(){
+            TopicConnectionFactory topicConnectionFactory = null;
+            TopicConnection topicConnection = null;
+            TopicSession topicSession = null;
+            Topic topic = null;
+            TopicPublisher topicPublisher = null;
+            Message message = null;
+            
+            topicConnectionFactory = new 
ActiveMQConnectionFactory(CONNECTION_URL);
+            try {
+                topic = new ActiveMQTopic(TOPIC_NAME);
+                topicConnection = 
topicConnectionFactory.createTopicConnection();
+                topicSession = topicConnection.createTopicSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                topicPublisher = topicSession.createPublisher(topic);
+                message = topicSession.createMessage();
+            } catch (Exception ex) {
+                exceptions.add(ex);
+            }
+            while (shouldPublish) {
+                try {
+                    topicPublisher.publish(message, DeliveryMode.PERSISTENT, 
1, 2 * 60 * 60 * 1000);
+                } catch (JMSException ex) {
+                    exceptions.add(ex);
+                }
+                try {
+                    Thread.sleep(1);
+                } catch (Exception ex) {
+                }
+            }
+        }
+    }
+    
+    private void configurePersistence(BrokerService broker) throws Exception{
+        File dataDirFile = new File("target/" + getName());
+        KahaDBPersistenceAdapter kahaDBAdapter = new 
KahaDBPersistenceAdapter();
+        kahaDBAdapter.setDirectory(dataDirFile);
+        broker.setPersistenceAdapter(kahaDBAdapter);
+    }
+    
+    public void testFailover() throws Exception{
+        
+        configurePersistence(broker);
+        broker.start();
+        
+        Thread publisherThread = new Thread(new MessagePublisher());
+        publisherThread.start();
+        final int numSubs = 100;
+        final List<SimpleTopicSubscriber> list = new 
ArrayList<SimpleTopicSubscriber>(numSubs);
+        for (int i = 0; i < numSubs; i++) {
+            
+            final int id = i;
+            Thread thread = new Thread(new Runnable(){
+                public void run(){
+                    SimpleTopicSubscriber s =new 
SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis() + "-" + id, 
TOPIC_NAME);
+                    list.add(s);
+                }
+            });
+            thread.start();
+            
+        }
+
+        Wait.waitFor(new Wait.Condition(){
+            @Override
+            public boolean isSatisified() throws Exception {
+                return numSubs == list.size();
+            }
+        });
+
+        broker.stop();
+        broker = createBroker(false);
+        configurePersistence(broker);
+        broker.start();
+        Thread.sleep(10000);
+        for (SimpleTopicSubscriber s:list) {
+            s.closeConnection();
+        }
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+    }
+    
+    // makes heavy use of threads and can demonstrate 
https://issues.apache.org/activemq/browse/AMQ-2028
+    // with use dedicatedTaskRunner=true and produce OOM
+    public void initCombosForTestConcurrentDurableConsumer(){
+        addCombinationValues("useDedicatedTaskRunner", new Object[] { 
Boolean.TRUE, Boolean.FALSE });
+    }
+    
+    public void testConcurrentDurableConsumer() throws Exception{
+        
+        broker.start();
+        broker.waitUntilStarted();
+        
+        factory = createConnectionFactory();
+        final String topicName = getName();
+        final int numMessages = 500;
+        int numConsumers = 1;
+        final CountDownLatch counsumerStarted = new 
CountDownLatch(numConsumers);
+        final AtomicInteger receivedCount = new AtomicInteger();
+        Runnable consumer = new Runnable(){
+            public void run(){
+                final String consumerName = Thread.currentThread().getName();
+                int acked = 0;
+                int received = 0;
+                
+                try {
+                    while (acked < numMessages / 2) {
+                        // take one message and close, ack on occasion
+                        Connection consumerConnection = 
factory.createConnection();
+                        ((ActiveMQConnection) 
consumerConnection).setWatchTopicAdvisories(false);
+                        consumerConnection.setClientID(consumerName);
+                        Session consumerSession = 
consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+                        Topic topic = consumerSession.createTopic(topicName);
+                        consumerConnection.start();
+                        
+                        MessageConsumer consumer = 
consumerSession.createDurableSubscriber(topic, consumerName);
+                        
+                        counsumerStarted.countDown();
+                        Message msg = null;
+                        do {
+                            msg = consumer.receive(5000);
+                            if (msg != null) {
+                                receivedCount.incrementAndGet();
+                                if (received != 0 && received % 100 == 0) {
+                                    LOG.info("Received msg: " + 
msg.getJMSMessageID());
+                                }
+                                if (++received % 2 == 0) {
+                                    msg.acknowledge();
+                                    acked++;
+                                }
+                            }
+                        } while (msg == null);
+
+                        consumerConnection.close();
+                    }
+                    assertTrue(received >= acked);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    exceptions.add(e);
+                }
+            }
+        };
+        
+        ExecutorService executor = Executors.newFixedThreadPool(numConsumers);
+        
+        for (int i = 0; i < numConsumers; i++) {
+            executor.execute(consumer);
+        }
+        
+        assertTrue(counsumerStarted.await(30, TimeUnit.SECONDS));
+        
+        Connection producerConnection = factory.createConnection();
+        ((ActiveMQConnection) 
producerConnection).setWatchTopicAdvisories(false);
+        Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = producerSession.createTopic(topicName);
+        MessageProducer producer = producerSession.createProducer(topic);
+        producerConnection.start();
+        for (int i = 0; i < numMessages; i++) {
+            BytesMessage msg = producerSession.createBytesMessage();
+            msg.writeBytes(payload);
+            producer.send(msg);
+            if (i != 0 && i % 100 == 0) {
+                LOG.info("Sent msg " + i);
+            }
+        }
+        
+        executor.shutdown();
+        executor.awaitTermination(30, TimeUnit.SECONDS);
+        
+        Wait.waitFor(new Wait.Condition(){
+            public boolean isSatisified() throws Exception{
+                LOG.info("receivedCount: " + receivedCount.get());
+                return receivedCount.get() == numMessages;
+            }
+        }, 360 * 1000);
+        assertEquals("got required some messages", numMessages, 
receivedCount.get());
+        assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty());
+    }
+    
+    public void testConsumerRecover() throws Exception{
+        doTestConsumer(true);
+    }
+    
+    public void testConsumer() throws Exception{
+        doTestConsumer(false);
+    }
+
+    public void testPrefetchViaBrokerConfig() throws Exception {
+
+        Integer prefetchVal = new Integer(150);
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setDurableTopicPrefetch(prefetchVal.intValue());
+        policyEntry.setPrioritizedMessages(true);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(policyEntry);
+        broker.setDestinationPolicy(policyMap);
+        broker.start();
+
+        factory = createConnectionFactory();
+        Connection consumerConnection = factory.createConnection();
+        consumerConnection.setClientID(CONSUMER_NAME);
+        Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = consumerSession.createTopic(getClass().getName());
+        MessageConsumer consumer = 
consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
+        consumerConnection.start();
+
+        ObjectName activeSubscriptionObjectName = 
broker.getAdminView().getDurableTopicSubscribers()[0];
+        Object prefetchFromSubView = 
broker.getManagementContext().getAttribute(activeSubscriptionObjectName, 
"PrefetchSize");
+        assertEquals(prefetchVal, prefetchFromSubView);
+    }
+    
+    public void doTestConsumer(boolean forceRecover) throws Exception{
+        
+        if (forceRecover) {
+            configurePersistence(broker);
+        }
+        broker.start();
+        
+        factory = createConnectionFactory();
+        Connection consumerConnection = factory.createConnection();
+        consumerConnection.setClientID(CONSUMER_NAME);
+        Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = consumerSession.createTopic(getClass().getName());
+        MessageConsumer consumer = 
consumerSession.createDurableSubscriber(topic, CONSUMER_NAME);
+        consumerConnection.start();
+        consumerConnection.close();
+        broker.stop();
+        broker = createBroker(false);
+        if (forceRecover) {
+            configurePersistence(broker);
+        }
+        broker.start();
+        
+        Connection producerConnection = factory.createConnection();
+        
+        Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        
+        MessageProducer producer = producerSession.createProducer(topic);
+        producerConnection.start();
+        for (int i = 0; i < COUNT; i++) {
+            BytesMessage msg = producerSession.createBytesMessage();
+            msg.writeBytes(payload);
+            producer.send(msg);
+            if (i != 0 && i % 1000 == 0) {
+                LOG.info("Sent msg " + i);
+            }
+        }
+        producerConnection.close();
+        broker.stop();
+        broker = createBroker(false);
+        if (forceRecover) {
+            configurePersistence(broker);
+        }
+        broker.start();
+        
+        consumerConnection = factory.createConnection();
+        consumerConnection.setClientID(CONSUMER_NAME);
+        consumerConnection.start();
+        consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        
+        consumer = consumerSession.createDurableSubscriber(topic, 
CONSUMER_NAME);
+        for (int i = 0; i < COUNT; i++) {
+            Message msg = consumer.receive(10000);
+            assertNotNull("Missing message: " + i, msg);
+            if (i != 0 && i % 1000 == 0) {
+                LOG.info("Received msg " + i);
+            }
+            
+        }
+        consumerConnection.close();
+        
+    }
+    
+    @Override
+    protected void setUp() throws Exception{
+        if (broker == null) {
+            broker = createBroker(true);
+        }
+        
+        super.setUp();
+    }
+    
+    @Override
+    protected void tearDown() throws Exception{
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+    }
+    
+    protected Topic creatTopic(Session s,String destinationName) throws 
JMSException{
+        return s.createTopic(destinationName);
+    }
+    
+    /**
+     * Factory method to create a new broker
+     * 
+     * @throws Exception
+     */
+    protected BrokerService createBroker(boolean deleteStore) throws Exception{
+        BrokerService answer = new BrokerService();
+        configureBroker(answer, deleteStore);
+        return answer;
+    }
+    
+    protected void configureBroker(BrokerService answer,boolean deleteStore) 
throws Exception{
+        answer.setDeleteAllMessagesOnStartup(deleteStore);
+        KahaDBStore kaha = new KahaDBStore();
+        //kaha.setConcurrentStoreAndDispatchTopics(false);
+        File directory = new File("target/activemq-data/kahadb");
+        if (deleteStore) {
+            IOHelper.deleteChildren(directory);
+        }
+        kaha.setDirectory(directory);
+        //kaha.setMaxAsyncJobs(10);
+        
+        answer.setPersistenceAdapter(kaha);
+        answer.addConnector(bindAddress);
+        answer.setUseShutdownHook(false);
+        answer.setAdvisorySupport(false);
+        answer.setDedicatedTaskRunner(useDedicatedTaskRunner);
+    }
+    
+    protected ActiveMQConnectionFactory createConnectionFactory() throws 
Exception{
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(bindAddress);
+        factory.setUseDedicatedTaskRunner(useDedicatedTaskRunner);
+        return factory;
+    }
+    
+    public static Test suite(){
+        return suite(DurableConsumerTest.class);
+    }
+    
+    public static void main(String[] args){
+        junit.textui.TestRunner.run(suite());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
new file mode 100644
index 0000000..80c4e9f
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JMSDurableTopicNoLocalTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+
+/**
+ * 
+ */
+public class JMSDurableTopicNoLocalTest extends EmbeddedBrokerTestSupport {
+    protected String bindAddress;
+
+    public void testConsumeNoLocal() throws Exception {
+        final String TEST_NAME = getClass().getName();
+        Connection connection = createConnection();
+        connection.setClientID(TEST_NAME);
+        
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        
+        TopicSubscriber subscriber = session.createDurableSubscriber((Topic) 
destination, "topicUser2", null, true);
+        
+        
+        final CountDownLatch latch = new CountDownLatch(1);
+        subscriber.setMessageListener(new MessageListener() {
+            public void onMessage(Message message) {
+                System.out.println("Receive a message " + message);
+                latch.countDown();        
+            }   
+        });
+        
+        connection.start();
+        
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("THIS IS A TEST");
+        producer.send(message);
+        producer.close();
+        latch.await(5,TimeUnit.SECONDS);
+        assertEquals(latch.getCount(),1);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "vm://localhost";
+        useTopic=true;
+        super.setUp();
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setUseJmx(false);
+        answer.setPersistent(true);
+        answer.setDeleteAllMessagesOnStartup(true);
+        answer.addConnector(bindAddress);
+        return answer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
new file mode 100644
index 0000000..05a8c1d
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsDurableTopicSlowReceiveTest.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.Properties;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.test.JmsTopicSendReceiveTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class JmsDurableTopicSlowReceiveTest extends JmsTopicSendReceiveTest {
+    
+    static final int NMSG = 200;
+    static final int MSIZE = 256000;
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(JmsDurableTopicSlowReceiveTest.class);
+    private static final String COUNT_PROPERY_NAME = "count";
+
+    protected Connection connection2;
+    protected Session session2;
+    protected Session consumeSession2;
+    protected MessageConsumer consumer2;
+    protected MessageProducer producer2;
+    protected Destination consumerDestination2;
+    BrokerService broker;
+    private Connection connection3;
+    private Session consumeSession3;
+    private TopicSubscriber consumer3;
+
+    /**
+     * Set up a durable suscriber test.
+     * 
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        this.durable = true;
+        broker = createBroker();
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        broker.stop();
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws 
Exception {
+        ActiveMQConnectionFactory result = new 
ActiveMQConnectionFactory("vm://localhost?async=false");
+        Properties props = new Properties();
+        props.put("prefetchPolicy.durableTopicPrefetch", "5");
+        props.put("prefetchPolicy.optimizeDurableTopicPrefetch", "5");
+        result.setProperties(props);
+        return result;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+
+    /**
+     * Test if all the messages sent are being received.
+     * 
+     * @throws Exception
+     */
+    public void testSlowReceiver() throws Exception {
+        connection2 = createConnection();
+        connection2.setClientID("test");
+        connection2.start();
+        consumeSession2 = connection2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        consumerDestination2 = session2.createTopic(getConsumerSubject() + 
"2");
+        consumer2 = 
consumeSession2.createDurableSubscriber((Topic)consumerDestination2, getName());
+
+        consumer2.close();
+        connection2.close();
+        new Thread(new Runnable() {
+
+            public void run() {
+                try {
+                    int count = 0;
+                    for (int loop = 0; loop < 4; loop++) {
+                        connection2 = createConnection();
+                        connection2.start();
+                        session2 = connection2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                        producer2 = session2.createProducer(null);
+                        producer2.setDeliveryMode(deliveryMode);
+                        Thread.sleep(1000);
+                        for (int i = 0; i < NMSG / 4; i++) {
+                            BytesMessage message = 
session2.createBytesMessage();
+                            message.writeBytes(new byte[MSIZE]);
+                            message.setStringProperty("test", "test");
+                            message.setIntProperty(COUNT_PROPERY_NAME, count);
+                            message.setJMSType("test");
+                            producer2.send(consumerDestination2, message);
+                            Thread.sleep(50);
+                            if (verbose) {
+                                LOG.debug("Sent(" + loop + "): " + i);
+                            }
+                            count++;
+                        }
+                        producer2.close();
+                        connection2.stop();
+                        connection2.close();
+                    }
+                } catch (Throwable e) {
+                    e.printStackTrace();
+                }
+            }
+        }, "SENDER Thread").start();
+        connection3 = createConnection();
+        connection3.setClientID("test");
+        connection3.start();
+        consumeSession3 = connection3.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        consumer3 = 
consumeSession3.createDurableSubscriber((Topic)consumerDestination2, getName());
+        connection3.close();
+        int count = 0;
+        for (int loop = 0; loop < 4; ++loop) {
+            connection3 = createConnection();
+            connection3.setClientID("test");
+            connection3.start();
+            consumeSession3 = connection3.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+            consumer3 = 
consumeSession3.createDurableSubscriber((Topic)consumerDestination2, getName());
+            Message msg = null;
+            int i;
+            for (i = 0; i < NMSG / 4; i++) {
+                msg = consumer3.receive(10000);
+                if (msg == null) {
+                    break;
+                }
+                if (verbose) {
+                    LOG.debug("Received(" + loop + "): " + i + " count = " + 
msg.getIntProperty(COUNT_PROPERY_NAME));
+                }
+                assertNotNull(msg);
+                assertEquals(msg.getJMSType(), "test");
+                assertEquals(msg.getStringProperty("test"), "test");
+                assertEquals("Messages received out of order", count, 
msg.getIntProperty(COUNT_PROPERY_NAME));
+                Thread.sleep(500);
+                msg.acknowledge();
+                count++;
+            }
+            consumer3.close();
+            assertEquals("Receiver " + loop, NMSG / 4, i);
+            connection3.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
new file mode 100644
index 0000000..2858302
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/JmsTimeoutTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.ResourceAllocationException;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.RequestTimedOutIOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsTimeoutTest extends EmbeddedBrokerTestSupport {
+
+    static final Logger LOG = LoggerFactory.getLogger(JmsTimeoutTest.class);
+
+    private final int messageSize=1024*64;
+    private final int messageCount=10000;
+    private final AtomicInteger exceptionCount = new AtomicInteger(0);
+
+    /**
+     * Test the case where the broker is blocked due to a memory limit
+     * and a producer timeout is set on the connection.
+     * @throws Exception
+     */
+    public void testBlockedProducerConnectionTimeout() throws Exception {
+        final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
+        final ActiveMQDestination queue = createDestination("testqueue");
+
+        // we should not take longer than 10 seconds to return from send
+        cx.setSendTimeout(10000);
+
+        Runnable r = new Runnable() {
+            public void run() {
+                try {
+                    LOG.info("Sender thread starting");
+                    Session session = cx.createSession(false, 1);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+                    TextMessage message = 
session.createTextMessage(createMessageText());
+                    for(int count=0; count<messageCount; count++){
+                        producer.send(message);
+                    }
+                    LOG.info("Done sending..");
+                } catch (JMSException e) {
+                    if (e.getCause() instanceof RequestTimedOutIOException) {
+                        exceptionCount.incrementAndGet();
+                    } else {
+                        e.printStackTrace();
+                    }
+                    return;
+                }
+
+            }
+        };
+        cx.start();
+        Thread producerThread = new Thread(r);
+        producerThread.start();
+        producerThread.join(30000);
+        cx.close();
+        // We should have a few timeout exceptions as memory store will fill up
+        assertTrue("No exception from the broker", exceptionCount.get() > 0);
+    }
+
+    /**
+     * Test the case where the broker is blocked due to a memory limit
+     * with a fail timeout
+     * @throws Exception
+     */
+    public void testBlockedProducerUsageSendFailTimeout() throws Exception {
+        final ActiveMQConnection cx = (ActiveMQConnection)createConnection();
+        final ActiveMQDestination queue = createDestination("testqueue");
+
+        broker.getSystemUsage().setSendFailIfNoSpaceAfterTimeout(5000);
+        Runnable r = new Runnable() {
+            public void run() {
+                try {
+                    LOG.info("Sender thread starting");
+                    Session session = cx.createSession(false, 1);
+                    MessageProducer producer = session.createProducer(queue);
+                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+                    TextMessage message = 
session.createTextMessage(createMessageText());
+                    for(int count=0; count<messageCount; count++){
+                        producer.send(message);
+                    }
+                    LOG.info("Done sending..");
+                } catch (JMSException e) {
+                    if (e instanceof ResourceAllocationException || 
e.getCause() instanceof RequestTimedOutIOException) {
+                        exceptionCount.incrementAndGet();
+                    } else {
+                        e.printStackTrace();
+                    }
+                    return;
+                }
+            }
+        };
+        cx.start();
+        Thread producerThread = new Thread(r);
+        producerThread.start();
+        producerThread.join(30000);
+        cx.close();
+        // We should have a few timeout exceptions as memory store will fill up
+        assertTrue("No exception from the broker", exceptionCount.get() > 0);
+    }
+
+    protected void setUp() throws Exception {
+        exceptionCount.set(0);
+        bindAddress = "tcp://localhost:0";
+        broker = createBroker();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.getSystemUsage().getMemoryUsage().setLimit(5*1024*1024);
+
+        super.setUp();
+    }
+
+    @Override
+    protected ConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(
+            
broker.getTransportConnectors().get(0).getPublishableConnectString());
+    }
+
+    private String createMessageText() {
+        StringBuffer buffer = new StringBuffer();
+        buffer.append("<filler>");
+        for (int i = buffer.length(); i < messageSize; i++) {
+            buffer.append('X');
+        }
+        buffer.append("</filler>");
+        return buffer.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java
new file mode 100644
index 0000000..e8d5371
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.File;
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runner.RunWith;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
+
+
+@RunWith(BlockJUnit4ClassRunner.class)
+public class MemoryUsageBlockResumeTest extends TestSupport implements 
Thread.UncaughtExceptionHandler {
+
+    public int deliveryMode = DeliveryMode.PERSISTENT;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MemoryUsageBlockResumeTest.class);
+    private static byte[] buf = new byte[4 * 1024];
+    private static byte[] bigBuf = new byte[48 * 1024];
+
+    private BrokerService broker;
+    AtomicInteger messagesSent = new AtomicInteger(0);
+    AtomicInteger messagesConsumed = new AtomicInteger(0);
+
+    protected long messageReceiveTimeout = 10000L;
+
+    Destination destination = new ActiveMQQueue("FooTwo");
+    Destination bigDestination = new ActiveMQQueue("FooTwoBig");
+
+    private String connectionUri;
+    private final Vector<Throwable> exceptions = new Vector<Throwable>();
+
+    @Test(timeout = 60 * 1000)
+    public void testBlockByOtherResumeNoException() throws Exception {
+
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri);
+
+        // ensure more than on message can be pending when full
+        factory.setProducerWindowSize(48*1024);
+        // ensure messages are spooled to disk for this consumer
+        ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
+        prefetch.setTopicPrefetch(10);
+        factory.setPrefetchPolicy(prefetch);
+        Connection consumerConnection = factory.createConnection();
+        consumerConnection.start();
+
+        Session consumerSession = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = 
consumerSession.createConsumer(bigDestination);
+
+        final Connection producerConnection = factory.createConnection();
+        producerConnection.start();
+
+        final int fillWithBigCount = 10;
+        Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+        producer.setDeliveryMode(deliveryMode);
+        for (int idx = 0; idx < fillWithBigCount; ++idx) {
+            Message message = session.createTextMessage(new String(bigBuf) + 
idx);
+            producer.send(bigDestination, message);
+            messagesSent.incrementAndGet();
+            LOG.info("After big: " + idx + ", System Memory Usage " + 
broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+        }
+
+        // will block on pfc
+        final int toSend = 20;
+        Thread producingThread = new Thread("Producing thread") {
+            @Override
+            public void run() {
+                try {
+                    Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = 
session.createProducer(destination);
+                    producer.setDeliveryMode(deliveryMode);
+                    for (int idx = 0; idx < toSend; ++idx) {
+                        Message message = session.createTextMessage(new 
String(buf) + idx);
+                        producer.send(destination, message);
+                        messagesSent.incrementAndGet();
+                        LOG.info("After little:" + idx + ", System Memory 
Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+                    }
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+        producingThread.start();
+
+        Thread producingThreadTwo = new Thread("Producing thread") {
+            @Override
+            public void run() {
+                try {
+                    Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+                    MessageProducer producer = 
session.createProducer(destination);
+                    producer.setDeliveryMode(deliveryMode);
+                    for (int idx = 0; idx < toSend; ++idx) {
+                        Message message = session.createTextMessage(new 
String(buf) + idx);
+                        producer.send(destination, message);
+                        messagesSent.incrementAndGet();
+                        LOG.info("After little:" + idx + ", System Memory 
Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+                    }
+                } catch (Throwable ex) {
+                    ex.printStackTrace();
+                }
+            }
+        };
+        producingThreadTwo.start();
+
+        assertTrue("producer has sent x in a reasonable time", 
Wait.waitFor(new Wait.Condition()
+        {
+            @Override
+            public boolean isSatisified() throws Exception {
+                 LOG.info("Checking for : X sent, System Memory Usage " + 
broker.getSystemUsage().getMemoryUsage().getPercentUsage() + ", sent:  " + 
messagesSent);
+                return messagesSent.get() > 20;
+            }
+        }));
+
+
+        LOG.info("Consuming from big q to allow delivery to smaller q from 
pending");
+        int count = 0;
+
+        Message m = null;
+
+        for (;count < 10; count++) {
+            assertTrue((m = consumer.receive(messageReceiveTimeout)) != null);
+            LOG.info("Recieved Message (" + count + "):" + m + ", System 
Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage());
+            messagesConsumed.incrementAndGet();
+        }
+        consumer.close();
+
+        producingThread.join();
+        producingThreadTwo.join();
+
+        assertEquals("Incorrect number of Messages Sent: " + 
messagesSent.get(), messagesSent.get(), fillWithBigCount +  toSend*2);
+
+        // consume all little messages
+        consumer = consumerSession.createConsumer(destination);
+        for (count = 0;count < toSend*2; count++) {
+            assertTrue((m = consumer.receive(messageReceiveTimeout)) != null);
+            LOG.info("Recieved Message (" + count + "):" + m + ", System 
Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() );
+            messagesConsumed.incrementAndGet();
+        }
+
+        assertEquals("Incorrect number of Messages consumed: " + 
messagesConsumed.get(), messagesSent.get(), messagesConsumed.get());
+
+        //assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+
+        Thread.setDefaultUncaughtExceptionHandler(this);
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(false);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        setDefaultPersistenceAdapter(broker);
+        broker.getSystemUsage().getMemoryUsage().setLimit((30 * 16 * 1024));
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setOptimizedDispatch(true);
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+        broker.setDestinationPolicy(policyMap);
+
+        broker.addConnector("tcp://localhost:0");
+        broker.start();
+
+        connectionUri = 
broker.getTransportConnectors().get(0).getPublishableConnectString();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+        LOG.error("Unexpected Unhandeled ex on: " + t, e);
+        exceptions.add(e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java
new file mode 100644
index 0000000..b229e0e
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerTestSupport;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.util.IOHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+import java.io.File;
+
+public class MemoryUsageBrokerTest extends BrokerTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MemoryUsageBrokerTest.class);
+
+    protected void setUp() throws Exception {
+        this.setAutoFail(true);
+        super.setUp();
+    }
+
+    @Override
+    protected PolicyEntry getDefaultPolicy() {
+        PolicyEntry policy = super.getDefaultPolicy();
+        // Disable PFC and assign a large memory limit that's larger than the 
default broker memory limit for queues
+        policy.setProducerFlowControl(false);
+        policy.setQueue(">");
+        policy.setMemoryLimit(128 * 1024 * 1024);
+        return policy;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        KahaDBStore kaha = new KahaDBStore();
+        File directory = new File("target/activemq-data/kahadb");
+        IOHelper.deleteChildren(directory);
+        kaha.setDirectory(directory);
+        kaha.deleteAllMessages();
+        broker.setPersistenceAdapter(kaha);
+        return broker;
+    }
+
+    protected ConnectionFactory createConnectionFactory() {
+        return new ActiveMQConnectionFactory(broker.getVmConnectorURI());
+    }
+
+    protected Connection createJmsConnection() throws JMSException {
+        return createConnectionFactory().createConnection();
+    }
+
+    public void testMemoryUsage() throws Exception {
+        Connection conn = createJmsConnection();
+        Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue.a.b");
+        MessageProducer producer = session.createProducer(queue);
+        for (int i = 0; i < 100000; i++) {
+            BytesMessage bm = session.createBytesMessage();
+            bm.writeBytes(new byte[1024]);
+            producer.send(bm);
+            if ((i + 1) % 100 == 0) {
+                session.commit();
+                int memoryUsagePercent = 
broker.getSystemUsage().getMemoryUsage().getPercentUsage();
+                LOG.info((i + 1) + " messages have been sent; broker memory 
usage " + memoryUsagePercent + "%");
+                assertTrue("Used more than available broker memory", 
memoryUsagePercent <= 100);
+            }
+        }
+        session.commit();
+        producer.close();
+        session.close();
+        conn.close();
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java
new file mode 100644
index 0000000..e7feb90
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MemoryUsageCleanupTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MemoryUsageCleanupTest.class);
+    private static final String QUEUE_NAME = 
MemoryUsageCleanupTest.class.getName() + "Queue";
+
+    private final String str = new String(
+        "QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR");
+
+    private BrokerService broker;
+    private String connectionUri;
+    private ExecutorService pool;
+    private String queueName;
+    private Random r = new Random();
+
+    @Before
+    public void setUp() throws Exception {
+
+        broker = new BrokerService();
+        broker.setDataDirectory("target" + File.separator + "activemq-data");
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setDedicatedTaskRunner(false);
+        broker.setAdvisorySupport(false);
+        broker.setDeleteAllMessagesOnStartup(true);
+
+        SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy();
+        strategy.setProcessExpired(false);
+        strategy.setProcessNonPersistent(false);
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setQueue(">");
+        defaultPolicy.setOptimizedDispatch(true);
+        defaultPolicy.setDeadLetterStrategy(strategy);
+        defaultPolicy.setMemoryLimit(300000000);
+
+        PolicyMap policyMap = new PolicyMap();
+        policyMap.setDefaultEntry(defaultPolicy);
+
+        broker.setDestinationPolicy(policyMap);
+
+        broker.getSystemUsage().getMemoryUsage().setLimit(300000000L);
+
+        broker.addConnector("tcp://localhost:0").setName("Default");
+        broker.start();
+        broker.waitUntilStarted();
+
+        connectionUri = 
broker.getTransportConnectors().get(0).getPublishableConnectString();
+        pool = Executors.newFixedThreadPool(10);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+
+        if (pool != null) {
+            pool.shutdown();
+        }
+    }
+
+    @Test
+    public void testIt() throws Exception {
+
+        final int startPercentage = 
broker.getAdminView().getMemoryPercentUsage();
+        LOG.info("MemoryUseage at test start = " + startPercentage);
+
+        for (int i = 0; i < 2; i++) {
+            LOG.info("Started the test iteration: " + i + " using queueName = 
" + queueName);
+            queueName = QUEUE_NAME + i;
+            final CountDownLatch latch = new CountDownLatch(11);
+
+            pool.execute(new Runnable() {
+                @Override
+                public void run() {
+                    receiveAndDiscard100messages(latch);
+                }
+            });
+
+            for (int j = 0; j < 10; j++) {
+                pool.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        send10000messages(latch);
+                    }
+                });
+            }
+
+            LOG.info("Waiting on the send / receive latch");
+            latch.await(5, TimeUnit.MINUTES);
+            LOG.info("Resumed");
+
+            destroyQueue();
+            TimeUnit.SECONDS.sleep(2);
+        }
+
+        LOG.info("MemoryUseage before awaiting temp store cleanup = " + 
broker.getAdminView().getMemoryPercentUsage());
+
+        assertTrue("MemoryUsage should return to: " + startPercentage +
+                   "% but was " + 
broker.getAdminView().getMemoryPercentUsage() + "%", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker.getAdminView().getMemoryPercentUsage() <= 
startPercentage + 1;
+            }
+        }));
+
+        int endPercentage = broker.getAdminView().getMemoryPercentUsage();
+        LOG.info("MemoryUseage at test end = " + endPercentage);
+    }
+
+    public void destroyQueue() {
+        try {
+            Broker broker = this.broker.getBroker();
+            if (!broker.isStopped()) {
+                LOG.info("Removing: " + queueName);
+                
broker.removeDestination(this.broker.getAdminConnectionContext(), new 
ActiveMQQueue(queueName), 10);
+            }
+        } catch (Exception e) {
+            LOG.warn("Got an error while removing the test queue", e);
+        }
+    }
+
+    private void send10000messages(CountDownLatch latch) {
+        ActiveMQConnection activeMQConnection = null;
+        try {
+            activeMQConnection = createConnection(null);
+            Session session = activeMQConnection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            MessageProducer producer = session.createProducer(session
+                    .createQueue(queueName));
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            activeMQConnection.start();
+            for (int i = 0; i < 10000; i++) {
+                TextMessage textMessage = session.createTextMessage();
+                textMessage.setText(generateBody(1000));
+                textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                producer.send(textMessage);
+                try {
+                    Thread.sleep(10);
+                } catch (InterruptedException e) {
+                }
+            }
+            producer.close();
+        } catch (JMSException e) {
+            LOG.warn("Got an error while sending the messages", e);
+        } finally {
+            if (activeMQConnection != null) {
+                try {
+                    activeMQConnection.close();
+                } catch (JMSException e) {
+                }
+            }
+        }
+        latch.countDown();
+    }
+
+    private void receiveAndDiscard100messages(CountDownLatch latch) {
+        ActiveMQConnection activeMQConnection = null;
+        try {
+            activeMQConnection = createConnection(null);
+            Session session = activeMQConnection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createConsumer(
+                    session.createQueue(queueName));
+            activeMQConnection.start();
+            for (int i = 0; i < 100; i++) {
+                messageConsumer.receive();
+            }
+            messageConsumer.close();
+            LOG.info("Created and disconnected");
+        } catch (JMSException e) {
+            LOG.warn("Got an error while receiving the messages", e);
+        } finally {
+            if (activeMQConnection != null) {
+                try {
+                    activeMQConnection.close();
+                } catch (JMSException e) {
+                }
+            }
+        }
+        latch.countDown();
+    }
+
+    private ActiveMQConnection createConnection(String id) throws JMSException 
{
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(connectionUri);
+        if (id != null) {
+            factory.setClientID(id);
+        }
+
+        ActiveMQConnection connection = (ActiveMQConnection) 
factory.createConnection();
+        return connection;
+    }
+
+    private String generateBody(int length) {
+
+        StringBuilder sb = new StringBuilder();
+        int te = 0;
+        for (int i = 1; i <= length; i++) {
+            te = r.nextInt(62);
+            sb.append(str.charAt(te));
+        }
+        return sb.toString();
+    }
+}

Reply via email to