http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 9c9e6f5..ebea686 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -19,6 +19,8 @@ package 
org.apache.activemq.artemis.tests.integration.management;
 import static 
org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
 
 import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
@@ -27,6 +29,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 import javax.json.JsonArray;
 import javax.json.JsonObject;
@@ -60,27 +63,46 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.junit.Wait;
 import 
org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
 import org.apache.activemq.artemis.utils.Base64;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(value = Parameterized.class)
 public class QueueControlTest extends ManagementTestBase {
 
    private ActiveMQServer server;
    private ClientSession session;
    private ServerLocator locator;
+   private final boolean durable;
+
+   @Parameterized.Parameters(name = "durable={0}")
+   public static Collection<Object[]> getParams() {
+      return Arrays.asList(new Object[][] {{true}, {false}});
+   }
+
+
+   /**
+    * @param durable
+    */
+   public QueueControlTest(boolean durable) {
+      super();
+      this.durable = durable;
+   }
+
 
    @Test
    public void testAttributes() throws Exception {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
       SimpleString filter = new SimpleString("color = 'blue'");
-      boolean durable = RandomUtil.randomBoolean();
 
-      session.createQueue(address, queue, filter, durable);
+      session.createQueue(address, RoutingType.MULTICAST, queue, filter, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(queue.toString(), queueControl.getName());
@@ -97,7 +119,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(queue.toString(), queueControl.getName());
@@ -112,7 +134,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
       final SimpleString deadLetterAddress = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertNull(queueControl.getDeadLetterAddress());
@@ -137,7 +159,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
       String deadLetterAddress = RandomUtil.randomString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
@@ -155,7 +177,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
       final SimpleString expiryAddress = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertNull(queueControl.getExpiryAddress());
@@ -180,7 +202,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
       String expiryAddress = RandomUtil.randomString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
@@ -200,7 +222,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
@@ -220,7 +242,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
@@ -250,18 +272,18 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, getMessageCount(queueControl));
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      producer.send(session.createMessage(durable));
+      assertMessageMetrics(queueControl, 1, durable);
 
       consumeMessages(1, session, queue);
 
-      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       session.deleteQueue(queue);
    }
@@ -271,7 +293,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, getMessageCount(queueControl));
@@ -302,15 +324,15 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, getMessagesAdded(queueControl));
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
       Assert.assertEquals(1, getMessagesAdded(queueControl));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
       Assert.assertEquals(2, getMessagesAdded(queueControl));
 
       consumeMessages(2, session, queue);
@@ -325,7 +347,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, false);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
@@ -351,13 +373,13 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, queueControl.getScheduledCount());
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, 
System.currentTimeMillis() + delay);
       producer.send(message);
 
@@ -366,13 +388,17 @@ public class QueueControlTest extends ManagementTestBase {
          Thread.sleep(100);
       }
 
-      Assert.assertEquals(1, queueControl.getScheduledCount());
+      assertScheduledMetrics(queueControl, 1, durable);
+      assertMessageMetrics(queueControl, 1, durable);
+
       consumeMessages(0, session, queue);
 
       Thread.sleep(delay * 2);
 
       Assert.assertEquals(0, queueControl.getScheduledCount());
       consumeMessages(1, session, queue);
+      assertMessageMetrics(queueControl, 0, durable);
+      assertScheduledMetrics(queueControl, 0, durable);
 
       session.deleteQueue(queue);
    }
@@ -395,14 +421,14 @@ public class QueueControlTest extends ManagementTestBase {
          SimpleString address = RandomUtil.randomSimpleString();
          queue = RandomUtil.randomSimpleString();
 
-         transSession.createQueue(address, queue, null, false);
+         transSession.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
          final QueueControl queueControl = createManagementControl(address, 
queue);
 
          ClientProducer producer = transSession.createProducer(address);
 
          for (int i = 0; i < numMsg; i++) {
-            ClientMessage message = transSession.createMessage(false);
+            ClientMessage message = transSession.createMessage(durable);
             message.putIntProperty(new SimpleString("seqno"), i);
             producer.send(message);
          }
@@ -480,17 +506,17 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
       int intValue = RandomUtil.randomInt();
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       Queue srvqueue = server.locateQueue(queue);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       message.putIntProperty(new SimpleString("key"), intValue);
       producer.send(message);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       ClientConsumer consumer = session.createConsumer(queue);
       session.start();
@@ -525,20 +551,22 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
       int intValue = RandomUtil.randomInt();
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, 
System.currentTimeMillis() + delay);
       message.putIntProperty(new SimpleString("key"), intValue);
       producer.send(message);
       // unscheduled message
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       Map<String, Object>[] messages = queueControl.listScheduledMessages();
       Assert.assertEquals(1, messages.length);
+      assertScheduledMetrics(queueControl, 1, durable);
+
       Assert.assertEquals(intValue, 
Integer.parseInt((messages[0].get("key")).toString()));
 
       Thread.sleep(delay + 500);
@@ -557,7 +585,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
       int intValue = RandomUtil.randomInt();
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
@@ -567,7 +595,7 @@ public class QueueControlTest extends ManagementTestBase {
       message.putIntProperty(new SimpleString("key"), intValue);
       producer.send(message);
       // unscheduled message
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       String jsonString = queueControl.listScheduledMessagesAsJSON();
       Assert.assertNotNull(jsonString);
@@ -593,10 +621,10 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, queueControl.getDeliveringCount());
@@ -604,11 +632,11 @@ public class QueueControlTest extends ManagementTestBase {
       ClientConsumer consumer = session.createConsumer(queue);
       ClientMessage message = consumer.receive(500);
       Assert.assertNotNull(message);
-      Assert.assertEquals(1, queueControl.getDeliveringCount());
+      assertDeliveringMetrics(queueControl, 1, durable);
 
       message.acknowledge();
       session.commit();
-      Assert.assertEquals(0, queueControl.getDeliveringCount());
+      assertDeliveringMetrics(queueControl, 0, durable);
 
       consumer.close();
       session.deleteQueue(queue);
@@ -629,7 +657,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
 
       try {
-         session.createQueue(address, RoutingType.ANYCAST, queue, null, false);
+         session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
          for (int i = 0; i < THREAD_COUNT; i++) {
             producerExecutor.submit(() -> {
@@ -665,7 +693,8 @@ public class QueueControlTest extends ManagementTestBase {
          producerCountDown.await(30, TimeUnit.SECONDS);
          consumerCountDown.await(30, TimeUnit.SECONDS);
 
-         QueueControl queueControl = createManagementControl(address, queue, 
RoutingType.ANYCAST);
+         QueueControl queueControl = createManagementControl(address, queue, 
RoutingType.MULTICAST);
+         Thread.sleep(200);
          Assert.assertEquals(0, queueControl.getMessageCount());
          Assert.assertEquals(0, queueControl.getConsumerCount());
          Assert.assertEquals(0, queueControl.getDeliveringCount());
@@ -694,12 +723,12 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
       int intValue = RandomUtil.randomInt();
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       message.putIntProperty(new SimpleString("key"), intValue);
       producer.send(message);
 
@@ -731,17 +760,18 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
       producer.send(matchingMessage);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(unmatchingMessage);
 
+      assertMessageMetrics(queueControl, 2, durable);
       Map<String, Object>[] messages = queueControl.listMessages(filter);
       Assert.assertEquals(1, messages.length);
       Assert.assertEquals(matchingValue, 
Long.parseLong(messages[0].get("key").toString()));
@@ -750,6 +780,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       messages = queueControl.listMessages(filter);
       Assert.assertEquals(0, messages.length);
+      assertMessageMetrics(queueControl, 0, durable);
 
       session.deleteQueue(queue);
    }
@@ -759,12 +790,12 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       Map<String, Object>[] messages = queueControl.listMessages(null);
       Assert.assertEquals(2, messages.length);
@@ -782,12 +813,12 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       Map<String, Object>[] messages = queueControl.listMessages("");
       Assert.assertEquals(2, messages.length);
@@ -810,14 +841,14 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
       producer.send(matchingMessage);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(unmatchingMessage);
 
@@ -853,8 +884,8 @@ public class QueueControlTest extends ManagementTestBase {
       AddressSettings addressSettings = new 
AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
       server.getAddressSettingsRepository().addMatch(adName.toString(), 
addressSettings);
 
-      session.createQueue(dla, dlq, null, false);
-      session.createQueue(adName, qName, null, false);
+      session.createQueue(dla, RoutingType.MULTICAST, dlq, null, durable);
+      session.createQueue(adName, RoutingType.MULTICAST, qName, null, durable);
 
       // Send message to queue.
       ClientProducer producer = session.createProducer(adName);
@@ -874,7 +905,7 @@ public class QueueControlTest extends ManagementTestBase {
       Assert.assertNull(clientMessage);
 
       QueueControl queueControl = createManagementControl(dla, dlq);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
       final long messageID = getFirstMessageId(queueControl);
 
       // Retry the message - i.e. it should go from DLQ to original Queue.
@@ -882,6 +913,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       // Assert DLQ is empty...
       Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       // .. and that the message is now on the original queue once more.
       clientMessage = clientConsumer.receive(500);
@@ -909,8 +941,8 @@ public class QueueControlTest extends ManagementTestBase {
       
server.getAddressSettingsRepository().addMatch(forwardingAddress.toString(), 
addressSettings);
 
       // create target queue, DLQ and source topic
-      session.createQueue(dla, RoutingType.ANYCAST, dlq, null, false);
-      session.createQueue(forwardingAddress, RoutingType.ANYCAST, 
forwardingQueue, null, false);
+      session.createQueue(dla, RoutingType.MULTICAST, dlq, null, durable);
+      session.createQueue(forwardingAddress, RoutingType.MULTICAST, 
forwardingQueue, null, durable);
       session.createAddress(myTopic, RoutingType.MULTICAST, false);
 
       DivertConfiguration divert = new 
DivertConfiguration().setName("local-divert")
@@ -935,15 +967,16 @@ public class QueueControlTest extends ManagementTestBase {
       clientMessage = clientConsumer.receiveImmediate();
       Assert.assertNull(clientMessage);
 
-      QueueControl queueControl = createManagementControl(dla, dlq, 
RoutingType.ANYCAST);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      QueueControl queueControl = createManagementControl(dla, dlq, 
RoutingType.MULTICAST);
+      assertMessageMetrics(queueControl, 1, durable);
+
       final long messageID = getFirstMessageId(queueControl);
 
       // Retry the message - i.e. it should go from DLQ to original Queue.
       Assert.assertTrue(queueControl.retryMessage(messageID));
 
       // Assert DLQ is empty...
-      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       // .. and that the message is now on the original queue once more.
       clientMessage = clientConsumer.receive(500);
@@ -970,8 +1003,8 @@ public class QueueControlTest extends ManagementTestBase {
       AddressSettings addressSettings = new 
AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
       server.getAddressSettingsRepository().addMatch(adName.toString(), 
addressSettings);
 
-      session.createQueue(dla, dlq, null, false);
-      session.createQueue(adName, qName, null, false);
+      session.createQueue(dla, RoutingType.MULTICAST, dlq, null, durable);
+      session.createQueue(adName, RoutingType.MULTICAST, qName, null, durable);
 
       // Send message to queue.
       ClientProducer producer = session.createProducer(adName);
@@ -1014,13 +1047,13 @@ public class QueueControlTest extends 
ManagementTestBase {
       assertTrue(queueMemorySize2.get() > 0);
 
       QueueControl dlqQueueControl = createManagementControl(dla, dlq);
-      Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl));
+      assertMessageMetrics(dlqQueueControl, numMessagesToTest, durable);
 
       // Retry all messages - i.e. they should go from DLQ to original Queue.
       Assert.assertEquals(numMessagesToTest, dlqQueueControl.retryMessages());
 
       // Assert DLQ is empty...
-      Assert.assertEquals(0, getMessageCount(dlqQueueControl));
+      assertMessageMetrics(dlqQueueControl, 0, durable);
 
       //Verify that original queue has a memory size of greater than 0 and DLQ 
is 0 after move
       assertTrue(queueMemorySize1.get() > 0);
@@ -1056,12 +1089,12 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString otherAddress = RandomUtil.randomSimpleString();
       SimpleString otherQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
-      session.createQueue(otherAddress, otherQueue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
+      session.createQueue(otherAddress, RoutingType.MULTICAST, otherQueue, 
null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       SimpleString key = RandomUtil.randomSimpleString();
       long value = RandomUtil.randomLong();
       message.putLongProperty(key, value);
@@ -1076,7 +1109,7 @@ public class QueueControlTest extends ManagementTestBase {
       AtomicInteger queueMemorySize = (AtomicInteger) 
queueMemorySizeField.get(q);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       //verify memory usage is greater than 0
       Assert.assertTrue(queueMemorySize.get() > 0);
@@ -1084,7 +1117,7 @@ public class QueueControlTest extends ManagementTestBase {
       // moved all messages to otherQueue
       int movedMessagesCount = queueControl.moveMessages(null, 
otherQueue.toString());
       Assert.assertEquals(1, movedMessagesCount);
-      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       //verify memory usage is 0 after move
       Assert.assertEquals(0, queueMemorySize.get());
@@ -1111,9 +1144,9 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queueB = new SimpleString("B");
       SimpleString queueC = new SimpleString("C");
 
-      server.createQueue(address, RoutingType.MULTICAST, queueA, null, true, 
false);
-      server.createQueue(address, RoutingType.MULTICAST, queueB, null, true, 
false);
-      server.createQueue(address, RoutingType.MULTICAST, queueC, null, true, 
false);
+      server.createQueue(address, RoutingType.MULTICAST, queueA, null, 
durable, false);
+      server.createQueue(address, RoutingType.MULTICAST, queueB, null, 
durable, false);
+      server.createQueue(address, RoutingType.MULTICAST, queueC, null, 
durable, false);
 
 
       QueueControl queueControlA = createManagementControl(address, queueA);
@@ -1174,18 +1207,18 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
       SimpleString unknownQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       SimpleString key = RandomUtil.randomSimpleString();
       long value = RandomUtil.randomLong();
       message.putLongProperty(key, value);
       producer.send(message);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       // moved all messages to unknown queue
       try {
@@ -1194,6 +1227,7 @@ public class QueueControlTest extends ManagementTestBase {
       } catch (Exception e) {
       }
       Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       consumeMessages(1, session, queue);
 
@@ -1208,7 +1242,6 @@ public class QueueControlTest extends ManagementTestBase {
     * <li>consume the message which <strong>did</strong> matches the filter 
from otherQueue</li>
     * </ol>
     */
-
    @Test
    public void testMoveMessagesWithFilter() throws Exception {
       SimpleString key = new SimpleString("key");
@@ -1220,20 +1253,20 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString otherAddress = RandomUtil.randomSimpleString();
       SimpleString otherQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
-      session.createQueue(otherAddress, otherQueue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
+      session.createQueue(otherAddress, RoutingType.MULTICAST,otherQueue, 
null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
       producer.send(matchingMessage);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(unmatchingMessage);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // moved matching messages to otherQueue
       int movedMatchedMessagesCount = queueControl.moveMessages(key + " =" + 
matchingValue, otherQueue.toString());
@@ -1267,18 +1300,18 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString otherAddress = RandomUtil.randomSimpleString();
       SimpleString otherQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
-      session.createQueue(otherAddress, otherQueue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
+      session.createQueue(otherAddress, RoutingType.MULTICAST, otherQueue, 
null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send 2 messages on queue
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
       QueueControl otherQueueControl = createManagementControl(otherAddress, 
otherQueue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
-      Assert.assertEquals(0, getMessageCount(otherQueueControl));
+      assertMessageMetrics(queueControl, 2, durable);
+      assertMessageMetrics(otherQueueControl, 0, durable);
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1287,8 +1320,8 @@ public class QueueControlTest extends ManagementTestBase {
 
       boolean moved = queueControl.moveMessage(messageID, 
otherQueue.toString());
       Assert.assertTrue(moved);
-      Assert.assertEquals(1, getMessageCount(queueControl));
-      Assert.assertEquals(1, getMessageCount(otherQueueControl));
+      assertMessageMetrics(queueControl, 1, durable);
+      assertMessageMetrics(otherQueueControl, 1, durable);
 
       consumeMessages(1, session, queue);
       consumeMessages(1, session, otherQueue);
@@ -1303,14 +1336,14 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
       SimpleString unknownQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send 2 messages on queue
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1347,19 +1380,19 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
       producer.send(matchingMessage);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(unmatchingMessage);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeMessages(key + " =" 
+ matchingValue);
@@ -1391,24 +1424,24 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
       producer.send(matchingMessage);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(unmatchingMessage);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeMessages(5, key + " 
=" + matchingValue);
       Assert.assertEquals(1, removedMatchedMessagesCount);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       // consume the unmatched message from queue
       ClientConsumer consumer = session.createConsumer(queue);
@@ -1431,20 +1464,20 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeMessages(null);
       Assert.assertEquals(2, removedMatchedMessagesCount);
-      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       session.deleteQueue(queue);
    }
@@ -1454,20 +1487,20 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeAllMessages();
       Assert.assertEquals(2, removedMatchedMessagesCount);
-      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       session.deleteQueue(queue);
    }
@@ -1477,20 +1510,20 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeMessages("");
       Assert.assertEquals(2, removedMatchedMessagesCount);
-      Assert.assertEquals(0, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 0, durable);
 
       session.deleteQueue(queue);
    }
@@ -1500,15 +1533,15 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send 2 messages on queue
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1518,7 +1551,7 @@ public class QueueControlTest extends ManagementTestBase {
       // delete 1st message
       boolean deleted = queueControl.removeMessage(messageID);
       Assert.assertTrue(deleted);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       // check there is a single message to consume from queue
       consumeMessages(1, session, queue);
@@ -1531,15 +1564,15 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send 2 messages on queue, both scheduled
       long timeout = System.currentTimeMillis() + 5000;
-      ClientMessage m1 = session.createMessage(true);
+      ClientMessage m1 = session.createMessage(durable);
       m1.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeout);
       producer.send(m1);
-      ClientMessage m2 = session.createMessage(true);
+      ClientMessage m2 = session.createMessage(durable);
       m2.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, timeout);
       producer.send(m2);
 
@@ -1554,7 +1587,7 @@ public class QueueControlTest extends ManagementTestBase {
       // delete 1st message
       boolean deleted = queueControl.removeMessage(messageID);
       Assert.assertTrue(deleted);
-      Assert.assertEquals(1, queueControl.getScheduledCount());
+      assertScheduledMetrics(queueControl, 1, durable);
 
       // check there is a single message to consume from queue
       while (timeout > System.currentTimeMillis() && 
queueControl.getScheduledCount() == 1) {
@@ -1571,14 +1604,14 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send messages on queue
 
       for (int i = 0; i < 100; i++) {
 
-         ClientMessage msg = session.createMessage(false);
+         ClientMessage msg = session.createMessage(durable);
          msg.putIntProperty("count", i);
          producer.send(msg);
       }
@@ -1592,7 +1625,7 @@ public class QueueControlTest extends ManagementTestBase {
       }
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(100, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 100, durable);
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1604,7 +1637,7 @@ public class QueueControlTest extends ManagementTestBase {
       // delete 1st message
       boolean deleted = queueControl.removeMessage(messageID);
       Assert.assertTrue(deleted);
-      Assert.assertEquals(99, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 99, durable);
 
       cons.close();
 
@@ -1623,13 +1656,13 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(matchingMessage);
       producer.send(unmatchingMessage);
@@ -1637,6 +1670,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(3, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 3, durable);
 
       Assert.assertEquals(2, queueControl.countMessages(key + " =" + 
matchingValue));
       Assert.assertEquals(1, queueControl.countMessages(key + " =" + 
unmatchingValue));
@@ -1653,18 +1687,18 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
       for (int i = 0; i < 100; i++) {
-         ClientMessage msg = session.createMessage(false);
+         ClientMessage msg = session.createMessage(durable);
          msg.putStringProperty(key, 
SimpleString.toSimpleString(matchingValue));
          producer.send(msg);
       }
 
       for (int i = 0; i < 10; i++) {
-         ClientMessage msg = session.createMessage(false);
+         ClientMessage msg = session.createMessage(durable);
          msg.putStringProperty(key, 
SimpleString.toSimpleString(nonMatchingValue));
          producer.send(msg);
       }
@@ -1679,7 +1713,7 @@ public class QueueControlTest extends ManagementTestBase {
       assertNull(consumer.receiveImmediate());
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(110, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 110, durable);
 
       Assert.assertEquals(0, queueControl.countMessages("nonExistentProperty 
like \'%Temp/88\'"));
 
@@ -1700,14 +1734,14 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      ClientMessage matchingMessage = session.createMessage(false);
+      ClientMessage matchingMessage = session.createMessage(durable);
       matchingMessage.putLongProperty(key, matchingValue);
       producer.send(matchingMessage);
-      ClientMessage unmatchingMessage = session.createMessage(false);
+      ClientMessage unmatchingMessage = session.createMessage(durable);
       unmatchingMessage.putLongProperty(key, unmatchingValue);
       producer.send(unmatchingMessage);
 
@@ -1716,7 +1750,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       int expiredMessagesCount = queueControl.expireMessages(key + " =" + 
matchingValue);
       Assert.assertEquals(1, expiredMessagesCount);
-      Assert.assertEquals(1, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 1, durable);
 
       // consume the unmatched message from queue
       ClientConsumer consumer = session.createConsumer(queue);
@@ -1742,17 +1776,17 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString expiryAddress = RandomUtil.randomSimpleString();
       SimpleString expiryQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
-      session.createQueue(expiryAddress, expiryQueue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
+      session.createQueue(expiryAddress, RoutingType.MULTICAST, expiryQueue, 
null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send on queue
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
       QueueControl expiryQueueControl = createManagementControl(expiryAddress, 
expiryQueue);
-      Assert.assertEquals(1, getMessageCount(queueControl));
-      Assert.assertEquals(0, getMessageCount(expiryQueueControl));
+      assertMessageMetrics(queueControl, 1, durable);
+      assertMessageMetrics(expiryQueueControl, 0, durable);
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1764,8 +1798,9 @@ public class QueueControlTest extends ManagementTestBase {
 
       boolean expired = queueControl.expireMessage(messageID);
       Assert.assertTrue(expired);
-      Assert.assertEquals(0, getMessageCount(queueControl));
-      Assert.assertEquals(1, getMessageCount(expiryQueueControl));
+      Thread.sleep(200);
+      assertMessageMetrics(queueControl, 0, durable);
+      assertMessageMetrics(expiryQueueControl, 1, durable);
 
       consumeMessages(0, session, queue);
       consumeMessages(1, session, expiryQueue);
@@ -1782,17 +1817,17 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString deadLetterAddress = RandomUtil.randomSimpleString();
       SimpleString deadLetterQueue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
-      session.createQueue(deadLetterAddress, deadLetterQueue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
+      session.createQueue(deadLetterAddress, RoutingType.MULTICAST, 
deadLetterQueue, null, durable);
       ClientProducer producer = session.createProducer(address);
 
       // send 2 messages on queue
-      producer.send(session.createMessage(false));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
+      producer.send(session.createMessage(durable));
 
       QueueControl queueControl = createManagementControl(address, queue);
       QueueControl deadLetterQueueControl = 
createManagementControl(deadLetterAddress, deadLetterQueue);
-      Assert.assertEquals(2, getMessageCount(queueControl));
+      assertMessageMetrics(queueControl, 2, durable);
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1805,8 +1840,9 @@ public class QueueControlTest extends ManagementTestBase {
       Assert.assertEquals(0, getMessageCount(deadLetterQueueControl));
       boolean movedToDeadLetterAddress = 
queueControl.sendMessageToDeadLetterAddress(messageID);
       Assert.assertTrue(movedToDeadLetterAddress);
-      Assert.assertEquals(1, getMessageCount(queueControl));
-      Assert.assertEquals(1, getMessageCount(deadLetterQueueControl));
+      assertMessageMetrics(queueControl, 1, durable);
+      Thread.sleep(200);
+      assertMessageMetrics(deadLetterQueueControl, 1, durable);
 
       // check there is a single message to consume from queue
       consumeMessages(1, session, queue);
@@ -1826,10 +1862,10 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       message.setPriority(originalPriority);
       producer.send(message);
 
@@ -1860,10 +1896,10 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       ClientProducer producer = session.createProducer(address);
 
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       producer.send(message);
 
       QueueControl queueControl = createManagementControl(address, queue);
@@ -1894,7 +1930,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ActiveMQServerControl serverControl = 
ManagementControlHelper.createActiveMQServerControl(mbeanServer);
@@ -1908,7 +1944,7 @@ public class QueueControlTest extends ManagementTestBase {
       Assert.assertEquals(0, info.getCount());
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       Thread.sleep(200);
       jsonString = queueControl.listMessageCounter();
@@ -1918,7 +1954,7 @@ public class QueueControlTest extends ManagementTestBase {
       Assert.assertEquals(1, info.getCount());
       Assert.assertEquals(1, info.getCountDelta());
 
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       Thread.sleep(200);
       jsonString = queueControl.listMessageCounter();
@@ -1946,7 +1982,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ActiveMQServerControl serverControl = 
ManagementControlHelper.createActiveMQServerControl(mbeanServer);
@@ -1960,7 +1996,7 @@ public class QueueControlTest extends ManagementTestBase {
       Assert.assertEquals(0, info.getCount());
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
 
       Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
       jsonString = queueControl.listMessageCounter();
@@ -1998,7 +2034,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       String history = queueControl.listMessageCounterAsHTML();
@@ -2013,7 +2049,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ActiveMQServerControl serverControl = 
ManagementControlHelper.createActiveMQServerControl(mbeanServer);
@@ -2033,7 +2069,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
       QueueControl queueControl = createManagementControl(address, queue);
 
       ActiveMQServerControl serverControl = 
ManagementControlHelper.createActiveMQServerControl(mbeanServer);
@@ -2049,8 +2085,8 @@ public class QueueControlTest extends ManagementTestBase {
 
    @Test
    public void testMoveMessagesBack() throws Exception {
-      server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new 
SimpleString("q1"), null, true, false);
-      server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new 
SimpleString("q2"), null, true, false);
+      server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new 
SimpleString("q1"), null, durable, false);
+      server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new 
SimpleString("q2"), null, durable, false);
 
       ServerLocator locator = createInVMNonHALocator();
 
@@ -2061,7 +2097,7 @@ public class QueueControlTest extends ManagementTestBase {
       ClientProducer prod1 = session.createProducer("q1");
 
       for (int i = 0; i < 10; i++) {
-         ClientMessage msg = session.createMessage(true);
+         ClientMessage msg = session.createMessage(durable);
 
          msg.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new 
SimpleString("dupl-" + i));
 
@@ -2113,8 +2149,8 @@ public class QueueControlTest extends ManagementTestBase {
 
    @Test
    public void testMoveMessagesBack2() throws Exception {
-      server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new 
SimpleString("q1"), null, true, false);
-      server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new 
SimpleString("q2"), null, true, false);
+      server.createQueue(new SimpleString("q1"), RoutingType.MULTICAST, new 
SimpleString("q1"), null, durable, false);
+      server.createQueue(new SimpleString("q2"), RoutingType.MULTICAST, new 
SimpleString("q2"), null, durable, false);
 
       ServerLocator locator = createInVMNonHALocator();
 
@@ -2127,7 +2163,7 @@ public class QueueControlTest extends ManagementTestBase {
       int NUMBER_OF_MSGS = 10;
 
       for (int i = 0; i < NUMBER_OF_MSGS; i++) {
-         ClientMessage msg = session.createMessage(true);
+         ClientMessage msg = session.createMessage(durable);
 
          msg.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new 
SimpleString("dupl-" + i));
 
@@ -2191,7 +2227,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString queue = RandomUtil.randomSimpleString();
 
       try {
-         session.createQueue(address, queue, null, false);
+         session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
          QueueControl queueControl = createManagementControl(address, queue);
 
          ActiveMQServerControl serverControl = 
ManagementControlHelper.createActiveMQServerControl(mbeanServer);
@@ -2203,7 +2239,6 @@ public class QueueControlTest extends ManagementTestBase {
          queueControl.resume();
          Assert.assertFalse(queueControl.isPaused());
       } catch (Exception e) {
-         // TODO Auto-generated catch block
          e.printStackTrace();
       }
    }
@@ -2213,15 +2248,15 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, getMessagesAdded(queueControl));
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
       Assert.assertEquals(1, getMessagesAdded(queueControl));
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
       Assert.assertEquals(2, getMessagesAdded(queueControl));
 
       consumeMessages(2, session, queue);
@@ -2240,16 +2275,16 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
 
       ClientProducer producer = session.createProducer(address);
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
       consumeMessages(1, session, queue);
       Assert.assertEquals(1, queueControl.getMessagesAcknowledged());
-      producer.send(session.createMessage(false));
+      producer.send(session.createMessage(durable));
       consumeMessages(1, session, queue);
       Assert.assertEquals(2, queueControl.getMessagesAcknowledged());
 
@@ -2265,13 +2300,13 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, queueControl.getMessagesExpired());
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       producer.send(message);
 
       // the message IDs are set on the server
@@ -2282,7 +2317,7 @@ public class QueueControlTest extends ManagementTestBase {
       queueControl.expireMessage(messageID);
       Assert.assertEquals(1, queueControl.getMessagesExpired());
 
-      message = session.createMessage(false);
+      message = session.createMessage(durable);
       producer.send(message);
 
       // the message IDs are set on the server
@@ -2305,13 +2340,13 @@ public class QueueControlTest extends 
ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(0, queueControl.getMessagesExpired());
 
       ClientProducer producer = session.createProducer(address);
-      ClientMessage message = session.createMessage(false);
+      ClientMessage message = session.createMessage(durable);
       producer.send(message);
 
       // the message IDs are set on the server
@@ -2321,6 +2356,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       queueControl.sendMessageToDeadLetterAddress(messageID);
       Assert.assertEquals(1, queueControl.getMessagesKilled());
+      assertMessageMetrics(queueControl, 0, durable);
 
       message = session.createMessage(false);
       producer.send(message);
@@ -2354,7 +2390,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       SimpleString testQueueName = new SimpleString("newQueue");
       String testQueueName2 = "newQueue2";
-      this.server.createQueue(testQueueName, RoutingType.ANYCAST, 
testQueueName, null, false, false);
+      this.server.createQueue(testQueueName, RoutingType.MULTICAST, 
testQueueName, null, durable, false);
 
       Notification notif = listener.getNotification();
 
@@ -2369,7 +2405,7 @@ public class QueueControlTest extends ManagementTestBase {
 
       ActiveMQServerControl control = 
ManagementControlHelper.createActiveMQServerControl(mbeanServer);
 
-      control.createQueue(testQueueName2, testQueueName2);
+      control.createQueue(testQueueName2, testQueueName2, 
RoutingType.MULTICAST.toString());
 
       notif = listener.getNotification();
       System.out.println("got notif: " + notif);
@@ -2387,7 +2423,7 @@ public class QueueControlTest extends ManagementTestBase {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString queue = RandomUtil.randomSimpleString();
 
-      session.createQueue(address, queue, null, false);
+      session.createQueue(address, RoutingType.MULTICAST, queue, null, 
durable);
 
       QueueControl queueControl = createManagementControl(address, queue);
 
@@ -2438,4 +2474,46 @@ public class QueueControlTest extends ManagementTestBase 
{
       JsonObject object = (JsonObject) array.get(0);
       return object.getJsonNumber("messageID").longValue();
    }
+
+   protected void assertMessageMetrics(final QueueControl queueControl, long 
messageCount, boolean durable) throws Exception {
+      assertMetrics(queueControl, messageCount, durable, 
queueControl::getMessageCount,
+            queueControl::getPersistentSize, 
queueControl::getDurableMessageCount, queueControl::getDurablePersistentSize);
+   }
+
+   protected void assertScheduledMetrics(final QueueControl queueControl, long 
messageCount, boolean durable) throws Exception {
+      assertMetrics(queueControl, messageCount, durable, 
queueControl::getScheduledCount,
+            queueControl::getScheduledSize, 
queueControl::getDurableScheduledCount, queueControl::getDurableScheduledSize);
+   }
+
+   protected void assertDeliveringMetrics(final QueueControl queueControl, 
long messageCount, boolean durable) throws Exception {
+      assertMetrics(queueControl, messageCount, durable, 
queueControl::getDeliveringCount,
+            queueControl::getDeliveringSize, 
queueControl::getDurableDeliveringCount, 
queueControl::getDurableDeliveringSize);
+   }
+
+   protected void assertMetrics(final QueueControl queueControl, long 
messageCount, boolean durable,
+         Supplier<Number> count, Supplier<Number> size,
+         Supplier<Number>durableCount, Supplier<Number> durableSize) throws 
Exception {
+
+      //make sure count stat equals message count
+      Assert.assertTrue(Wait.waitFor(() -> count.get().longValue() == 
messageCount, 3, 100));
+
+      if (messageCount > 0) {
+         //verify size stat greater than 0
+         Assert.assertTrue(Wait.waitFor(() -> size.get().longValue() > 0, 3, 
100));
+
+         //If durable then make sure durable count and size are correct
+         if (durable) {
+            Assert.assertTrue(Wait.waitFor(() -> 
durableCount.get().longValue() == messageCount, 3, 100));
+            Assert.assertTrue(Wait.waitFor(() -> durableSize.get().longValue() 
> 0, 3, 100));
+         } else {
+            Assert.assertTrue(Wait.waitFor(() -> 
durableCount.get().longValue() == 0, 3, 100));
+            Assert.assertTrue(Wait.waitFor(() -> durableSize.get().longValue() 
== 0, 3, 100));
+         }
+      } else {
+         Assert.assertTrue(Wait.waitFor(() -> count.get().longValue() == 0, 3, 
100));
+         Assert.assertTrue(Wait.waitFor(() -> durableCount.get().longValue() 
== 0, 3, 100));
+         Assert.assertTrue(Wait.waitFor(() -> size.get().longValue() == 0, 3, 
100));
+         Assert.assertTrue(Wait.waitFor(() -> durableSize.get().longValue() == 
0, 3, 100));
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 09621af..aafbb5b 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -16,16 +16,24 @@
  */
 package org.apache.activemq.artemis.tests.integration.management;
 
-import javax.management.openmbean.CompositeData;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.management.openmbean.CompositeData;
+
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(value = Parameterized.class)
 public class QueueControlUsingCoreTest extends QueueControlTest {
 
+   public QueueControlUsingCoreTest(boolean durable) {
+      super(durable);
+   }
+
    @Override
    protected QueueControl createManagementControl(final SimpleString address,
                                                   final SimpleString queue) 
throws Exception {
@@ -117,6 +125,21 @@ public class QueueControlUsingCoreTest extends 
QueueControlTest {
          }
 
          @Override
+         public long getDeliveringSize() {
+            return (Long) proxy.retrieveAttributeValue("deliveringSize", 
Long.class);
+         }
+
+         @Override
+         public int getDurableDeliveringCount() {
+            return (Integer) 
proxy.retrieveAttributeValue("durableDeliveringCount", Integer.class);
+         }
+
+         @Override
+         public long getDurableDeliveringSize() {
+            return (Long) 
proxy.retrieveAttributeValue("durableDeliveringSize", Long.class);
+         }
+
+         @Override
          public String getExpiryAddress() {
             return (String) proxy.retrieveAttributeValue("expiryAddress");
          }
@@ -187,6 +210,21 @@ public class QueueControlUsingCoreTest extends 
QueueControlTest {
          }
 
          @Override
+         public long getScheduledSize() {
+            return (Long) proxy.retrieveAttributeValue("scheduledSize", 
Long.class);
+         }
+
+         @Override
+         public long getDurableScheduledCount() {
+            return (Long) 
proxy.retrieveAttributeValue("durableScheduledCount", Long.class);
+         }
+
+         @Override
+         public long getDurableScheduledSize() {
+            return (Long) proxy.retrieveAttributeValue("durableScheduledSize", 
Long.class);
+         }
+
+         @Override
          public boolean isDurable() {
             return (Boolean) proxy.retrieveAttributeValue("durable");
          }
@@ -455,6 +493,21 @@ public class QueueControlUsingCoreTest extends 
QueueControlTest {
          public String listDeliveringMessagesAsJSON() throws Exception {
             return (String) 
proxy.invokeOperation("listDeliveringMessagesAsJSON");
          }
+
+         @Override
+         public long getPersistentSize() {
+            return (Long) proxy.retrieveAttributeValue("persistentSize", 
Long.class);
+         }
+
+         @Override
+         public long getDurableMessageCount() {
+            return (Long) proxy.retrieveAttributeValue("durableMessageCount", 
Long.class);
+         }
+
+         @Override
+         public long getDurablePersistentSize() {
+            return (Long) 
proxy.retrieveAttributeValue("durablePersistentSize", Long.class);
+         }
       };
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
index 6f47ba7..4714580 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingCounterTest.java
@@ -89,15 +89,17 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
          Transaction tx = new TransactionImpl(server.getStorageManager());
 
-         counter.increment(tx, 1);
+         counter.increment(tx, 1, 1000);
 
          assertEquals(0, counter.getValue());
+         assertEquals(0, counter.getPersistentSize());
 
          tx.commit();
 
          storage.waitOnOperations();
 
          assertEquals(1, counter.getValue());
+         assertEquals(1000, counter.getPersistentSize());
       } finally {
          sf.close();
          session.close();
@@ -121,7 +123,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
          for (int i = 0; i < 2100; i++) {
 
-            counter.increment(tx, 1);
+            counter.increment(tx, 1, 1000);
 
             if (i % 200 == 0) {
                tx.commit();
@@ -129,6 +131,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
                storage.waitOnOperations();
 
                assertEquals(i + 1, counter.getValue());
+               assertEquals((i + 1) * 1000, counter.getPersistentSize());
 
                tx = new TransactionImpl(server.getStorageManager());
             }
@@ -139,6 +142,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
          storage.waitOnOperations();
 
          assertEquals(2100, counter.getValue());
+         assertEquals(2100 * 1000, counter.getPersistentSize());
 
          server.stop();
 
@@ -153,6 +157,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
          counter = locateCounter(queue);
 
          assertEquals(2100, counter.getValue());
+         assertEquals(2100 * 1000, counter.getPersistentSize());
 
       } finally {
          sf.close();
@@ -180,7 +185,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
          for (int i = 0; i < 2100; i++) {
 
-            counter.increment(tx, 1);
+            counter.increment(tx, 1, 1000);
 
             if (i % 200 == 0) {
                tx.commit();
@@ -188,6 +193,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
                storage.waitOnOperations();
 
                assertEquals(i + 1, counter.getValue());
+               assertEquals((i + 1) * 1000, counter.getPersistentSize());
 
                tx = new TransactionImpl(server.getStorageManager());
             }
@@ -198,6 +204,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
          storage.waitOnOperations();
 
          assertEquals(2100, counter.getValue());
+         assertEquals(2100 * 1000, counter.getPersistentSize());
 
          server.stop();
 
@@ -212,6 +219,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
          counter = locateCounter(queue);
 
          assertEquals(0, counter.getValue());
+         assertEquals(0, counter.getPersistentSize());
 
       } finally {
          sf.close();
@@ -230,15 +238,17 @@ public class PagingCounterTest extends ActiveMQTestBase {
 
       Transaction tx = new TransactionImpl(server.getStorageManager());
 
-      counter.increment(tx, 1);
+      counter.increment(tx, 1, 1000);
 
       assertEquals(0, counter.getValue());
+      assertEquals(0, counter.getPersistentSize());
 
       tx.commit();
 
       storage.waitOnOperations();
 
       assertEquals(1, counter.getValue());
+      assertEquals(1000, counter.getPersistentSize());
 
       sl.close();
 
@@ -255,6 +265,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
       counter = locateCounter(queue);
 
       assertEquals(1, counter.getValue());
+      assertEquals(1000, counter.getPersistentSize());
 
    }
 
@@ -283,7 +294,7 @@ public class PagingCounterTest extends ActiveMQTestBase {
       Transaction tx = new TransactionImpl(xid, server.getStorageManager(), 
300);
 
       for (int i = 0; i < 2000; i++) {
-         counter.increment(tx, 1);
+         counter.increment(tx, 1, 1000);
       }
 
       assertEquals(0, counter.getValue());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ea70af15/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/AbstractPersistentStatTestSupport.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/AbstractPersistentStatTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/AbstractPersistentStatTestSupport.java
new file mode 100644
index 0000000..2247111
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/metrics/AbstractPersistentStatTestSupport.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.persistence.metrics;
+
+import java.util.Enumeration;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
+import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ */
+public abstract class AbstractPersistentStatTestSupport extends JMSTestBase {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractPersistentStatTestSupport.class);
+
+   protected static int defaultMessageSize = 1000;
+
+   @Override
+   protected boolean usePersistence() {
+      return true;
+   }
+
+   protected void consumeTestQueueMessages(String queueName, int num) throws 
Exception {
+
+      // Start the connection
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId2" + queueName);
+      connection.start();
+      Session session = connection.createSession(false, 
QueueSession.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(queueName);
+      MessageConsumer consumer;
+      try {
+         consumer = session.createConsumer(queue);
+         for (int i = 0; i < num; i++) {
+            consumer.receive();
+         }
+         consumer.close();
+      } finally {
+         // consumer.close();
+         connection.close();
+      }
+
+   }
+
+   protected void browseTestQueueMessages(String queueName) throws Exception {
+      // Start the connection
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId2" + queueName);
+      connection.start();
+      Session session = connection.createSession(false, 
QueueSession.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(queueName);
+
+      try {
+         QueueBrowser queueBrowser = session.createBrowser(queue);
+         @SuppressWarnings("unchecked")
+         Enumeration<Message> messages = queueBrowser.getEnumeration();
+         while (messages.hasMoreElements()) {
+            messages.nextElement();
+         }
+
+      } finally {
+         connection.close();
+      }
+
+   }
+
+   protected void consumeDurableTestMessages(Connection connection, String 
sub, int size, String topicName,
+         AtomicLong publishedMessageSize) throws Exception {
+
+
+      Session session = connection.createSession(false, 
QueueSession.AUTO_ACKNOWLEDGE);
+      Topic topic = session.createTopic(topicName);
+
+      try {
+         TopicSubscriber consumer = session.createDurableSubscriber(topic, 
sub);
+         for (int i = 0; i < size; i++) {
+            ActiveMQMessage message = (ActiveMQMessage) consumer.receive();
+            if (publishedMessageSize != null) {
+               
publishedMessageSize.addAndGet(-message.getCoreMessage().getEncodeSize());
+            }
+         }
+
+      } finally {
+         session.close();
+      }
+
+   }
+
+   protected void publishTestQueueMessages(int count, String queueName, int 
deliveryMode, int messageSize,
+         AtomicLong publishedMessageSize, boolean transacted) throws Exception 
{
+
+      // Start the connection
+      Connection connection = cf.createConnection();
+      connection.setClientID("clientId" + queueName);
+      connection.start();
+      Session session = transacted ? connection.createSession(transacted, 
QueueSession.SESSION_TRANSACTED) :
+         connection.createSession(transacted, QueueSession.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(queueName);
+
+      try {
+         MessageProducer prod = session.createProducer(queue);
+         prod.setDeliveryMode(deliveryMode);
+         for (int i = 0; i < count; i++) {
+            prod.send(createMessage(i, session, messageSize, 
publishedMessageSize));
+         }
+
+         if (transacted) {
+            session.commit();
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   protected void publishTestMessagesDurable(Connection connection, String[] 
subNames, String topicName,
+         int publishSize, int expectedSize, int messageSize, AtomicLong 
publishedMessageSize, boolean verifyBrowsing,
+         boolean shared)
+         throws Exception {
+      this.publishTestMessagesDurable(connection, subNames, topicName, 
publishSize, expectedSize, messageSize,
+            publishedMessageSize, verifyBrowsing, DeliveryMode.PERSISTENT, 
shared);
+   }
+
+   protected void publishTestMessagesDurable(Connection connection, String[] 
subNames, String topicName,
+         int publishSize, int expectedSize, int messageSize, AtomicLong 
publishedMessageSize, boolean verifyBrowsing,
+         int deliveryMode, boolean shared) throws Exception {
+
+      Session session = connection.createSession(false, 
TopicSession.AUTO_ACKNOWLEDGE);
+      Topic topic = session.createTopic(topicName);
+      for (String subName : subNames) {
+         if (shared) {
+            session.createSharedDurableConsumer(topic, subName);
+         } else {
+            session.createDurableSubscriber(topic, subName);
+         }
+      }
+
+      try {
+         // publish a bunch of non-persistent messages to fill up the temp
+         // store
+         MessageProducer prod = session.createProducer(topic);
+         prod.setDeliveryMode(deliveryMode);
+         for (int i = 0; i < publishSize; i++) {
+            prod.send(createMessage(i, session, messageSize, 
publishedMessageSize));
+         }
+
+      } finally {
+         session.close();
+      }
+
+   }
+
+   /**
+    * Generate random messages between 100 bytes and maxMessageSize
+    *
+    * @param session
+    * @return
+    * @throws JMSException
+    * @throws ActiveMQException
+    */
+   protected BytesMessage createMessage(int count, Session session, int 
maxMessageSize, AtomicLong publishedMessageSize)
+         throws JMSException, ActiveMQException {
+      final ActiveMQBytesMessage message = (ActiveMQBytesMessage) 
session.createBytesMessage();
+
+      final Random randomSize = new Random();
+      int size = randomSize.nextInt((maxMessageSize - 100) + 1) + 100;
+      final byte[] data = new byte[size];
+      final Random rng = new Random();
+      rng.nextBytes(data);
+      message.writeBytes(data);
+      if (publishedMessageSize != null) {
+         
publishedMessageSize.addAndGet(message.getCoreMessage().getPersistentSize());
+      }
+
+      return message;
+   }
+}

Reply via email to