http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
index 2091296..43e6010 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
@@ -118,44 +118,54 @@ public class ClusterConnectionControl2Test extends 
ManagementTestBase
       TransportConfiguration connectorConfig_1 = new 
TransportConfiguration(NETTY_CONNECTOR_FACTORY, acceptorParams_1);
       TransportConfiguration connectorConfig_0 = new 
TransportConfiguration(NETTY_CONNECTOR_FACTORY);
 
-      CoreQueueConfiguration queueConfig = new 
CoreQueueConfiguration(RandomUtil.randomString(),
-                                                              
RandomUtil.randomString(),
-                                                              null,
-                                                              false);
+      CoreQueueConfiguration queueConfig = new CoreQueueConfiguration()
+         .setAddress(RandomUtil.randomString())
+         .setName(RandomUtil.randomString())
+         .setDurable(false);
       List<String> connectorInfos = new ArrayList<String>();
       connectorInfos.add("netty");
-      BroadcastGroupConfiguration broadcastGroupConfig = new 
BroadcastGroupConfiguration(discoveryName,
-                                                                               
          250,
-                                                                               
          connectorInfos,
-                                             new 
UDPBroadcastGroupConfiguration(groupAddress, groupPort, null, -1));
-      DiscoveryGroupConfiguration discoveryGroupConfig = new 
DiscoveryGroupConfiguration(discoveryName,
-                                                                               
          0,
-                                                                               
          0,
-                                             new 
UDPBroadcastGroupConfiguration(groupAddress, groupPort, null, -1));
-
-      Configuration conf_1 = createBasicConfig();
-      conf_1.setSecurityEnabled(false);
-      conf_1.setJMXManagementEnabled(true);
-
-      clusterConnectionConfig_0 =
-               new ClusterConnectionConfiguration(clusterName, 
queueConfig.getAddress(), "netty", 1000, false, false,
-                                                  1, 1024,
-                                                  discoveryName);
-      conf_1.getClusterConfigurations().add(clusterConnectionConfig_0);
-      conf_1.getAcceptorConfigurations().add(acceptorConfig_1);
-      conf_1.getConnectorConfigurations().put("netty", connectorConfig_1);
-      conf_1.getQueueConfigurations().add(queueConfig);
-      conf_1.getDiscoveryGroupConfigurations().put(discoveryName, 
discoveryGroupConfig);
-      conf_1.getBroadcastGroupConfigurations().add(broadcastGroupConfig);
-
-      Configuration conf_0 = createBasicConfig(1);
-      conf_0.setSecurityEnabled(false);
-      conf_0.setJMXManagementEnabled(true);
-      conf_0.getAcceptorConfigurations().add(acceptorConfig_0);
-      conf_0.getConnectorConfigurations().put("netty", connectorConfig_0);
-      conf_0.getClusterConfigurations().add(clusterConnectionConfig_0);
-      conf_0.getDiscoveryGroupConfigurations().put(discoveryName, 
discoveryGroupConfig);
-      conf_0.getBroadcastGroupConfigurations().add(broadcastGroupConfig);
+
+      BroadcastGroupConfiguration broadcastGroupConfig = new 
BroadcastGroupConfiguration()
+         .setName(discoveryName)
+         .setBroadcastPeriod(250)
+         .setConnectorInfos(connectorInfos)
+         .setEndpointFactoryConfiguration(new UDPBroadcastGroupConfiguration()
+            .setGroupAddress(groupAddress)
+            .setGroupPort(groupPort));
+
+      DiscoveryGroupConfiguration discoveryGroupConfig = new 
DiscoveryGroupConfiguration()
+         .setName(discoveryName)
+         .setRefreshTimeout(0)
+         .setDiscoveryInitialWaitTimeout(0)
+         .setBroadcastEndpointFactoryConfiguration(new 
UDPBroadcastGroupConfiguration()
+            .setGroupAddress(groupAddress)
+            .setGroupPort(groupPort));
+
+      clusterConnectionConfig_0 = new ClusterConnectionConfiguration()
+         .setName(clusterName)
+         .setAddress(queueConfig.getAddress())
+         .setConnectorName("netty")
+         .setRetryInterval(1000)
+         .setDuplicateDetection(false)
+         .setForwardWhenNoConsumers(false)
+         .setMaxHops(1)
+         .setConfirmationWindowSize(1024)
+         .setDiscoveryGroupName(discoveryName);
+
+      Configuration conf_1 = createBasicConfig()
+         .addClusterConfiguration(clusterConnectionConfig_0)
+         .addAcceptorConfiguration(acceptorConfig_1)
+         .addConnectorConfiguration("netty", connectorConfig_1)
+         .addQueueConfiguration(queueConfig)
+         .addDiscoveryGroupConfiguration(discoveryName, discoveryGroupConfig)
+         .addBroadcastGroupConfiguration(broadcastGroupConfig);
+
+      Configuration conf_0 = createBasicConfig(1)
+         .addClusterConfiguration(clusterConnectionConfig_0)
+         .addAcceptorConfiguration(acceptorConfig_0)
+         .addConnectorConfiguration("netty", connectorConfig_0)
+         .addDiscoveryGroupConfiguration(discoveryName, discoveryGroupConfig)
+         .addBroadcastGroupConfiguration(broadcastGroupConfig);
 
       mbeanServer_1 = MBeanServerFactory.createMBeanServer();
       server1 = addServer(HornetQServers.newHornetQServer(conf_1, 
mbeanServer_1, false));

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
index 7a9284f..79985af 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
@@ -13,7 +13,6 @@
 package org.hornetq.tests.integration.management;
 import org.junit.Before;
 import org.junit.After;
-
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -25,13 +24,12 @@ import javax.management.MBeanServer;
 import javax.management.MBeanServerFactory;
 
 import org.junit.Assert;
-
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.UDPBroadcastGroupConfiguration;
 import org.hornetq.api.core.management.ClusterConnectionControl;
-import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.ObjectNameBuilder;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
@@ -167,8 +165,8 @@ public class ClusterConnectionControlTest extends 
ManagementTestBase
       clusterConnectionControl.stop();
 
       Assert.assertTrue(notifListener.getNotifications().size() > 0);
-      Notification notif = notifListener.getNotifications().get(0);
-      Assert.assertEquals(NotificationType.CLUSTER_CONNECTION_STOPPED, 
notif.getType());
+      Notification notif = 
getFirstNotificationOfType(notifListener.getNotifications(), 
CoreNotificationType.CLUSTER_CONNECTION_STOPPED);
+      Assert.assertNotNull(notif);
       Assert.assertEquals(clusterConnectionControl.getName(), 
notif.getProperties()
                                                                    
.getSimpleStringProperty(new SimpleString("name"))
                                                                    
.toString());
@@ -176,13 +174,31 @@ public class ClusterConnectionControlTest extends 
ManagementTestBase
       clusterConnectionControl.start();
 
       Assert.assertTrue(notifListener.getNotifications().size() > 0);
-      notif = notifListener.getNotifications().get(1);
-      Assert.assertEquals(NotificationType.CLUSTER_CONNECTION_STARTED, 
notif.getType());
+      notif = getFirstNotificationOfType(notifListener.getNotifications(), 
CoreNotificationType.CLUSTER_CONNECTION_STARTED);
+      Assert.assertNotNull(notif);
       Assert.assertEquals(clusterConnectionControl.getName(), 
notif.getProperties()
                                                                    
.getSimpleStringProperty(new SimpleString("name"))
                                                                    
.toString());
    }
 
+   private Notification getFirstNotificationOfType(List<Notification> 
notifications, CoreNotificationType type)
+   {
+      Notification result = null;
+
+      // the notifications can change while we're looping
+      List<Notification> notificationsClone = new ArrayList<>(notifications);
+
+      for (Notification notification : notificationsClone)
+      {
+         if (notification.getType().equals(type))
+         {
+            result = notification;
+         }
+      }
+
+      return result;
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -203,46 +219,55 @@ public class ClusterConnectionControlTest extends 
ManagementTestBase
                                                                           
acceptorParams,
                                                                           
RandomUtil.randomString());
 
-      CoreQueueConfiguration queueConfig = new 
CoreQueueConfiguration(RandomUtil.randomString(),
-                                                              
RandomUtil.randomString(),
-                                                              null,
-                                                              false);
+      CoreQueueConfiguration queueConfig = new CoreQueueConfiguration()
+         .setAddress(RandomUtil.randomString())
+         .setName(RandomUtil.randomString())
+         .setDurable(false);
       List<String> connectors = new ArrayList<String>();
       connectors.add(connectorConfig.getName());
 
 
       String discoveryGroupName = RandomUtil.randomString();
-      DiscoveryGroupConfiguration discoveryGroupConfig =
-               new DiscoveryGroupConfiguration(discoveryGroupName, 500, 0,
-                     new UDPBroadcastGroupConfiguration("230.1.2.3", 6745, 
null, -1));
-
-      Configuration conf_1 = createBasicConfig();
-      conf_1.setSecurityEnabled(false);
-      conf_1.setJMXManagementEnabled(true);
-      conf_1.getAcceptorConfigurations().add(acceptorConfig);
-      conf_1.getQueueConfigurations().add(queueConfig);
-
-      Configuration conf_0 = createBasicConfig();
-      clusterConnectionConfig1 =
-               new ClusterConnectionConfiguration(RandomUtil.randomString(), 
queueConfig.getAddress(),
-                                                  connectorConfig.getName(), 
RandomUtil.randomPositiveLong(),
-                                                  RandomUtil.randomBoolean(), 
RandomUtil.randomBoolean(),
-                                                  
RandomUtil.randomPositiveInt(), RandomUtil.randomPositiveInt(),
-                                                  connectors, false);
-      clusterConnectionConfig2 =
-               new ClusterConnectionConfiguration(RandomUtil.randomString(), 
queueConfig.getAddress(),
-                                                  connectorConfig.getName(), 
RandomUtil.randomPositiveLong(),
-                                                  RandomUtil.randomBoolean(), 
RandomUtil.randomBoolean(),
-                                                  
RandomUtil.randomPositiveInt(), RandomUtil.randomPositiveInt(),
-                                                  discoveryGroupName);
-
-      conf_0.setSecurityEnabled(false);
-      conf_0.setJMXManagementEnabled(true);
-      conf_0.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
-      conf_0.getConnectorConfigurations().put(connectorConfig.getName(), 
connectorConfig);
-      conf_0.getClusterConfigurations().add(clusterConnectionConfig1);
-      conf_0.getClusterConfigurations().add(clusterConnectionConfig2);
-      conf_0.getDiscoveryGroupConfigurations().put(discoveryGroupName, 
discoveryGroupConfig);
+      DiscoveryGroupConfiguration discoveryGroupConfig = new 
DiscoveryGroupConfiguration()
+         .setName(discoveryGroupName)
+         .setRefreshTimeout(500)
+         .setDiscoveryInitialWaitTimeout(0)
+         .setBroadcastEndpointFactoryConfiguration(new 
UDPBroadcastGroupConfiguration()
+                                                      
.setGroupAddress("230.1.2.3")
+                                                      .setGroupPort(6745));
+
+      Configuration conf_1 = createBasicConfig()
+         .addAcceptorConfiguration(acceptorConfig)
+         .addQueueConfiguration(queueConfig);
+
+      clusterConnectionConfig1 = new ClusterConnectionConfiguration()
+         .setName(RandomUtil.randomString())
+         .setAddress(queueConfig.getAddress())
+         .setConnectorName(connectorConfig.getName())
+         .setRetryInterval(RandomUtil.randomPositiveLong())
+         .setDuplicateDetection(RandomUtil.randomBoolean())
+         .setForwardWhenNoConsumers(RandomUtil.randomBoolean())
+         .setMaxHops(RandomUtil.randomPositiveInt())
+         .setConfirmationWindowSize(RandomUtil.randomPositiveInt())
+         .setStaticConnectors(connectors);
+
+      clusterConnectionConfig2 = new ClusterConnectionConfiguration()
+         .setName(RandomUtil.randomString())
+         .setAddress(queueConfig.getAddress())
+         .setConnectorName(connectorConfig.getName())
+         .setRetryInterval(RandomUtil.randomPositiveLong())
+         .setDuplicateDetection(RandomUtil.randomBoolean())
+         .setForwardWhenNoConsumers(RandomUtil.randomBoolean())
+         .setMaxHops(RandomUtil.randomPositiveInt())
+         .setConfirmationWindowSize(RandomUtil.randomPositiveInt())
+         .setDiscoveryGroupName(discoveryGroupName);
+
+      Configuration conf_0 = createBasicConfig()
+         .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()))
+         .addConnectorConfiguration(connectorConfig.getName(), connectorConfig)
+         .addClusterConfiguration(clusterConnectionConfig1)
+         .addClusterConfiguration(clusterConnectionConfig2)
+         .addDiscoveryGroupConfiguration(discoveryGroupName, 
discoveryGroupConfig);
 
       mbeanServer_1 = MBeanServerFactory.createMBeanServer();
       server_1 = addServer(HornetQServers.newHornetQServer(conf_1, 
mbeanServer_1, false));

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DivertControlTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DivertControlTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DivertControlTest.java
index 1998847..ef1e312 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DivertControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DivertControlTest.java
@@ -87,31 +87,28 @@ public class DivertControlTest extends ManagementTestBase
 
       TransportConfiguration connectorConfig = new 
TransportConfiguration(InVMConnectorFactory.class.getName());
 
-      CoreQueueConfiguration queueConfig = new 
CoreQueueConfiguration(RandomUtil.randomString(),
-                                                              
RandomUtil.randomString(),
-                                                              null,
-                                                              false);
-      CoreQueueConfiguration forwardQueueConfig = new 
CoreQueueConfiguration(RandomUtil.randomString(),
-                                                                    
RandomUtil.randomString(),
-                                                                    null,
-                                                                    false);
-
-      divertConfig = new DivertConfiguration(RandomUtil.randomString(),
-                                             RandomUtil.randomString(),
-                                             queueConfig.getAddress(),
-                                             forwardQueueConfig.getAddress(),
-                                             RandomUtil.randomBoolean(),
-                                             null,
-                                             null);
-      Configuration conf = createBasicConfig();
-      conf.setSecurityEnabled(false);
-      conf.setJMXManagementEnabled(true);
-      conf.getQueueConfigurations().add(queueConfig);
-      conf.getQueueConfigurations().add(forwardQueueConfig);
-      conf.getDivertConfigurations().add(divertConfig);
-
-      conf.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
-      conf.getConnectorConfigurations().put(connectorConfig.getName(), 
connectorConfig);
+      CoreQueueConfiguration queueConfig = new CoreQueueConfiguration()
+         .setAddress(RandomUtil.randomString())
+         .setName(RandomUtil.randomString())
+         .setDurable(false);
+      CoreQueueConfiguration forwardQueueConfig = new CoreQueueConfiguration()
+         .setAddress(RandomUtil.randomString())
+         .setName(RandomUtil.randomString())
+         .setDurable(false);
+
+      divertConfig = new DivertConfiguration()
+         .setName(RandomUtil.randomString())
+         .setRoutingName(RandomUtil.randomString())
+         .setAddress(queueConfig.getAddress())
+         .setForwardingAddress(forwardQueueConfig.getAddress())
+         .setExclusive(RandomUtil.randomBoolean());
+
+      Configuration conf = createBasicConfig()
+         .addQueueConfiguration(queueConfig)
+         .addQueueConfiguration(forwardQueueConfig)
+         .addDivertConfiguration(divertConfig)
+         .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()))
+         .addConnectorConfiguration(connectorConfig.getName(), 
connectorConfig);
 
       service = HornetQServers.newHornetQServer(conf, mbeanServer, false);
       service.start();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlTest.java
index 00d05d6..12add43 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlTest.java
@@ -40,6 +40,7 @@ import 
org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServers;
+import org.hornetq.core.settings.impl.SlowConsumerPolicy;
 import org.hornetq.core.transaction.impl.XidImpl;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.UnitTestCase;
@@ -101,8 +102,6 @@ public class HornetQServerControlTest extends 
ManagementTestBase
       Assert.assertEquals(conf.isClustered(), serverControl.isClustered());
       Assert.assertEquals(conf.isPersistDeliveryCountBeforeDelivery(),
                           
serverControl.isPersistDeliveryCountBeforeDelivery());
-      Assert.assertEquals(conf.getHAPolicy().isBackup(), 
serverControl.isBackup());
-      Assert.assertEquals(conf.getHAPolicy().isSharedStore(), 
serverControl.isSharedStore());
       Assert.assertEquals(conf.getScheduledThreadPoolMaxSize(), 
serverControl.getScheduledThreadPoolMaxSize());
       Assert.assertEquals(conf.getThreadPoolMaxSize(), 
serverControl.getThreadPoolMaxSize());
       Assert.assertEquals(conf.getSecurityInvalidationInterval(), 
serverControl.getSecurityInvalidationInterval());
@@ -144,7 +143,6 @@ public class HornetQServerControlTest extends 
ManagementTestBase
       Assert.assertEquals(conf.getJournalCompactMinFiles(), 
serverControl.getJournalCompactMinFiles());
       Assert.assertEquals(conf.getJournalCompactPercentage(), 
serverControl.getJournalCompactPercentage());
       Assert.assertEquals(conf.isPersistenceEnabled(), 
serverControl.isPersistenceEnabled());
-      Assert.assertEquals(conf.getHAPolicy().isFailoverOnServerShutdown(), 
serverControl.isFailoverOnServerShutdown());
    }
 
    @Test
@@ -502,6 +500,9 @@ public class HornetQServerControlTest extends 
ManagementTestBase
       long redistributionDelay = 5;
       boolean sendToDLAOnNoRoute = true;
       String addressFullMessagePolicy = "PAGE";
+      long slowConsumerThreshold = 5;
+      long slowConsumerCheckPeriod = 10;
+      String slowConsumerPolicy = SlowConsumerPolicy.KILL.toString();
 
       serverControl.addAddressSettings(addressMatch,
                                        DLA,
@@ -517,7 +518,10 @@ public class HornetQServerControlTest extends 
ManagementTestBase
                                        maxRedeliveryDelay,
                                        redistributionDelay,
                                        sendToDLAOnNoRoute,
-                                       addressFullMessagePolicy);
+                                       addressFullMessagePolicy,
+                                       slowConsumerThreshold,
+                                       slowConsumerCheckPeriod,
+                                       slowConsumerPolicy);
 
 
       boolean ex = false;
@@ -537,7 +541,10 @@ public class HornetQServerControlTest extends 
ManagementTestBase
                                           maxRedeliveryDelay,
                                           redistributionDelay,
                                           sendToDLAOnNoRoute,
-                                          addressFullMessagePolicy);
+                                          addressFullMessagePolicy,
+                                          slowConsumerThreshold,
+                                          slowConsumerCheckPeriod,
+                                          slowConsumerPolicy);
       }
       catch (Exception expected)
       {
@@ -564,6 +571,9 @@ public class HornetQServerControlTest extends 
ManagementTestBase
       assertEquals(redistributionDelay, info.getRedistributionDelay());
       assertEquals(sendToDLAOnNoRoute, info.isSendToDLAOnNoRoute());
       assertEquals(addressFullMessagePolicy, 
info.getAddressFullMessagePolicy());
+      assertEquals(slowConsumerThreshold, info.getSlowConsumerThreshold());
+      assertEquals(slowConsumerCheckPeriod, info.getSlowConsumerCheckPeriod());
+      assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy());
 
       serverControl.addAddressSettings(addressMatch,
                                        DLA,
@@ -579,7 +589,10 @@ public class HornetQServerControlTest extends 
ManagementTestBase
                                        maxRedeliveryDelay,
                                        redistributionDelay,
                                        sendToDLAOnNoRoute,
-                                       addressFullMessagePolicy);
+                                       addressFullMessagePolicy,
+                                       slowConsumerThreshold,
+                                       slowConsumerCheckPeriod,
+                                       slowConsumerPolicy);
 
 
       jsonString = serverControl.getAddressSettingsAsJSON(exactAddress);
@@ -598,6 +611,9 @@ public class HornetQServerControlTest extends 
ManagementTestBase
       assertEquals(redistributionDelay, info.getRedistributionDelay());
       assertEquals(sendToDLAOnNoRoute, info.isSendToDLAOnNoRoute());
       assertEquals(addressFullMessagePolicy, 
info.getAddressFullMessagePolicy());
+      assertEquals(slowConsumerThreshold, info.getSlowConsumerThreshold());
+      assertEquals(slowConsumerCheckPeriod, info.getSlowConsumerCheckPeriod());
+      assertEquals(slowConsumerPolicy, info.getSlowConsumerPolicy());
 
 
       ex = false;
@@ -617,7 +633,10 @@ public class HornetQServerControlTest extends 
ManagementTestBase
                                           maxRedeliveryDelay,
                                           redistributionDelay,
                                           sendToDLAOnNoRoute,
-                                          addressFullMessagePolicy);
+                                          addressFullMessagePolicy,
+                                          slowConsumerThreshold,
+                                          slowConsumerCheckPeriod,
+                                          slowConsumerPolicy);
       }
       catch (Exception e)
       {
@@ -948,8 +967,85 @@ public class HornetQServerControlTest extends 
ManagementTestBase
 
       
System.out.println("HornetQServerControlTest.testCommitPreparedTransactions");
    }
-   // Package protected ---------------------------------------------
 
+   @Test
+   public void testScaleDownWithConnector() throws Exception
+   {
+      scaleDown(new ScaleDownHandler()
+      {
+         @Override
+         public void scaleDown(HornetQServerControl control) throws Exception
+         {
+            control.scaleDown("server2-connector");
+         }
+      });
+   }
+
+   @Test
+   public void testScaleDownWithOutConnector() throws Exception
+   {
+      scaleDown(new ScaleDownHandler()
+      {
+         @Override
+         public void scaleDown(HornetQServerControl control) throws Exception
+         {
+            control.scaleDown(null);
+         }
+      });
+   }
+
+   protected void scaleDown(ScaleDownHandler handler) throws Exception
+   {
+      SimpleString address = new SimpleString("testQueue");
+      Configuration conf = createDefaultConfig(false, 2);
+      conf.setSecurityEnabled(false);
+      conf.getAcceptorConfigurations().clear();
+      HashMap<String, Object> params = new HashMap<String, Object>();
+      params.put("server-id", "2");
+      conf.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName(), params));
+      HornetQServer server2 = HornetQServers.newHornetQServer(conf, null, 
true);
+      this.conf.getConnectorConfigurations().clear();
+      this.conf.getConnectorConfigurations().put("server2-connector", new 
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY, params));
+      try
+      {
+         server2.start();
+         server.createQueue(address, address, null, true, false);
+         server2.createQueue(address, address, null, true, false);
+         ServerLocator locator = 
HornetQClient.createServerLocatorWithoutHA(new 
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+         ClientSessionFactory csf = createSessionFactory(locator);
+         ClientSession session = csf.createSession();
+         ClientProducer producer = session.createProducer(address);
+         for (int i = 0; i < 100; i++)
+         {
+            ClientMessage message = session.createMessage(true);
+            message.getBodyBuffer().writeString("m" + i);
+            producer.send(message);
+         }
+
+         HornetQServerControl managementControl = createManagementControl();
+         handler.scaleDown(managementControl);
+         locator.close();
+         locator = HornetQClient.createServerLocatorWithoutHA(new 
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY, params));
+         csf = createSessionFactory(locator);
+         session = csf.createSession();
+         session.start();
+         ClientConsumer consumer = session.createConsumer(address);
+         for (int i = 0; i < 100; i++)
+         {
+            ClientMessage m = consumer.receive(5000);
+            assertNotNull(m);
+         }
+      }
+      finally
+      {
+         server2.stop();
+      }
+   }
+   // Package protected ---------------------------------------------
+   interface ScaleDownHandler
+   {
+      void scaleDown(HornetQServerControl control) throws Exception;
+   }
    // Protected -----------------------------------------------------
 
    @Override
@@ -964,13 +1060,13 @@ public class HornetQServerControlTest extends 
ManagementTestBase
                                                    params,
                                                    RandomUtil.randomString());
 
-      conf = createDefaultConfig(false);
-      conf.setSecurityEnabled(false);
-      conf.setJMXManagementEnabled(true);
-      conf.getAcceptorConfigurations().clear();
-      conf.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      conf = createDefaultConfig(false)
+         .setSecurityEnabled(false)
+         .setJMXManagementEnabled(true)
+         .clearAcceptorConfigurations()
+         .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()))
+         .addConnectorConfiguration(connectorConfig.getName(), 
connectorConfig);
       server = HornetQServers.newHornetQServer(conf, mbeanServer, true);
-      conf.getConnectorConfigurations().put(connectorConfig.getName(), 
connectorConfig);
       server.start();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
index 42f2c7e..41051f0 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
@@ -96,6 +96,17 @@ public class HornetQServerControlUsingCoreTest extends 
HornetQServerControlTest
 
    }
 
+   // the core messaging proxy doesn't work when the server is stopped so we 
cant run these 2 tests
+   @Override
+   public void testScaleDownWithOutConnector() throws Exception
+   {
+   }
+
+   @Override
+   public void testScaleDownWithConnector() throws Exception
+   {
+   }
+
    @Override
    protected HornetQServerControl createManagementControl() throws Exception
    {
@@ -107,6 +118,12 @@ public class HornetQServerControlUsingCoreTest extends 
HornetQServerControlTest
 
          }
 
+         @Override
+         public void scaleDown(String connector) throws Exception
+         {
+            throw new UnsupportedOperationException();
+         }
+
          private final CoreMessagingProxy proxy = new 
CoreMessagingProxy(session, ResourceNames.CORE_SERVER);
 
          public boolean isSharedStore()
@@ -119,6 +136,16 @@ public class HornetQServerControlUsingCoreTest extends 
HornetQServerControlTest
             return (Boolean) 
proxy.invokeOperation("closeConnectionsForAddress", ipAddress);
          }
 
+         public boolean closeConsumerConnectionsForAddress(final String 
address) throws Exception
+         {
+            return (Boolean) 
proxy.invokeOperation("closeConsumerConnectionsForAddress", address);
+         }
+
+         public boolean closeConnectionsForUser(final String userName) throws 
Exception
+         {
+            return (Boolean) proxy.invokeOperation("closeConnectionsForUser", 
userName);
+         }
+
          public boolean commitPreparedTransaction(final String 
transactionAsBase64) throws Exception
          {
             return (Boolean) 
proxy.invokeOperation("commitPreparedTransaction", transactionAsBase64);
@@ -356,7 +383,7 @@ public class HornetQServerControlUsingCoreTest extends 
HornetQServerControlTest
 
          public void setScaleDown(boolean scaleDown) throws Exception
          {
-            proxy.invokeOperation("setScaleDown", scaleDown);
+            proxy.invokeOperation("setEnabled", scaleDown);
          }
 
          public boolean isScaleDown()
@@ -548,7 +575,10 @@ public class HornetQServerControlUsingCoreTest extends 
HornetQServerControlTest
                                         @Parameter(desc = "the maximum 
redelivery delay", name = "maxRedeliveryDelay") long maxRedeliveryDelay,
                                         @Parameter(desc = "the redistribution 
delay", name = "redistributionDelay") long redistributionDelay,
                                         @Parameter(desc = "do we send to the 
DLA when there is no where to route the message", name = "sendToDLAOnNoRoute") 
boolean sendToDLAOnNoRoute,
-                                        @Parameter(desc = "the policy to use 
when the address is full", name = "addressFullMessagePolicy") String 
addressFullMessagePolicy) throws Exception
+                                        @Parameter(desc = "the policy to use 
when the address is full", name = "addressFullMessagePolicy") String 
addressFullMessagePolicy,
+                                        @Parameter(desc = "when a consumer 
falls below this threshold in terms of messages consumed per second it will be 
considered 'slow'", name = "slowConsumerThreshold") long slowConsumerThreshold,
+                                        @Parameter(desc = "how often (in 
seconds) to check for slow consumers", name = "slowConsumerCheckPeriod") long 
slowConsumerCheckPeriod,
+                                        @Parameter(desc = "the policy to use 
when a slow consumer is detected", name = "slowConsumerPolicy") String 
slowConsumerPolicy) throws Exception
          {
             proxy.invokeOperation("addAddressSettings",
                                   addressMatch,
@@ -565,7 +595,10 @@ public class HornetQServerControlUsingCoreTest extends 
HornetQServerControlTest
                                   maxRedeliveryDelay,
                                   redistributionDelay,
                                   sendToDLAOnNoRoute,
-                                  addressFullMessagePolicy);
+                                  addressFullMessagePolicy,
+                                  slowConsumerThreshold,
+                                  slowConsumerCheckPeriod,
+                                  slowConsumerPolicy);
          }
 
          public void removeAddressSettings(String addressMatch) throws 
Exception

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/JMXDomainTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/JMXDomainTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/JMXDomainTest.java
index f1fe3a0..44d1eed 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/JMXDomainTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/JMXDomainTest.java
@@ -12,15 +12,10 @@
  */
 package org.hornetq.tests.integration.management;
 
-import org.junit.Test;
-
 import java.util.HashMap;
 import java.util.Map;
 
-import javax.management.ObjectName;
-
 import org.hornetq.api.config.HornetQDefaultConfiguration;
-import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.management.ObjectNameBuilder;
 import org.hornetq.core.config.Configuration;
@@ -28,45 +23,35 @@ import 
org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.TransportConstants;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServers;
+import org.junit.After;
+import org.junit.Test;
 
 /**
  * A JMXDomainTest
  *
  * @author <a href="mailto:[email protected]";>Jeff Mesnil</a>
- *
- *
  */
 public class JMXDomainTest extends ManagementTestBase
 {
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
+   HornetQServer server_0 = null;
+   HornetQServer server_1 = null;
 
    @Test
    public void test2HornetQServersManagedFrom1MBeanServer() throws Exception
    {
-
-      Configuration config_0 = createDefaultConfig();
-      config_0.setJMXManagementEnabled(true);
+      Configuration config_0 = createDefaultConfig()
+         .setJMXManagementEnabled(true);
 
       String jmxDomain_1 = HornetQDefaultConfiguration.getDefaultJmxDomain() + 
".1";
 
-      Configuration config_1 = createBasicConfig();
       Map<String, Object> params = new HashMap<String, Object>();
       params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
-      config_1.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName(), params));
-      config_1.setJMXDomain(jmxDomain_1);
-      config_1.setJMXManagementEnabled(true);
+      Configuration config_1 = createBasicConfig()
+         .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName(), params))
+         .setJMXDomain(jmxDomain_1);
 
-      HornetQServer server_0 = HornetQServers.newHornetQServer(config_0, 
mbeanServer, false);
-      HornetQServer server_1 = HornetQServers.newHornetQServer(config_1, 
mbeanServer, false);
+      server_0 = HornetQServers.newHornetQServer(config_0, mbeanServer, false);
+      server_1 = HornetQServers.newHornetQServer(config_1, mbeanServer, false);
 
       ObjectNameBuilder builder_0 = ObjectNameBuilder.DEFAULT;
       ObjectNameBuilder builder_1 = ObjectNameBuilder.create(jmxDomain_1);
@@ -93,94 +78,22 @@ public class JMXDomainTest extends ManagementTestBase
 
       checkNoResource(builder_0.getHornetQServerObjectName());
       checkNoResource(builder_1.getHornetQServerObjectName());
-
    }
 
-   @Test
-   public void testDefaultObjectName() throws Exception
+   @Override
+   @After
+   public void tearDown() throws Exception
    {
-      ObjectName objectName = 
ObjectNameBuilder.DEFAULT.getJMSServerObjectName();
-      ObjectName defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default");
-
-      assertEquals(defaultValue, objectName);
-
-      objectName = ObjectNameBuilder.DEFAULT.getJMSQueueObjectName("A");
-      defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,jms-queue=\"A\"");
-
-      assertEquals(defaultValue, objectName);
-
-      objectName = ObjectNameBuilder.DEFAULT.getAcceptorObjectName("netty");
-      System.out.println("value: " + objectName);
-      defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,remote-acceptor=\"netty\"");
-
-      assertEquals(defaultValue, objectName);
-
-      objectName = ObjectNameBuilder.DEFAULT.getAddressObjectName(new 
SimpleString("jms.queue.M"));
-      System.out.println("value: " + objectName);
-      defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,core-address=\"jms.queue.M\"");
-
-      assertEquals(defaultValue, objectName);
+      if (server_0 != null)
+      {
+         server_0.stop();
+      }
 
-      objectName = ObjectNameBuilder.DEFAULT.getBridgeObjectName("mybridge");
-      System.out.println("value: " + objectName);
-      defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,bridge=\"mybridge\"");
+      if (server_1 != null)
+      {
+         server_1.stop();
+      }
 
-      assertEquals(defaultValue, objectName);
-
-      objectName = 
ObjectNameBuilder.DEFAULT.getBroadcastGroupObjectName("mybroadcastgroup");
-      System.out.println("value: " + objectName);
-      defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,broadcast-group=\"mybroadcastgroup\"");
-
-      assertEquals(defaultValue, objectName);
-
-      objectName = 
ObjectNameBuilder.DEFAULT.getClusterConnectionObjectName("my-cluster-connection");
-      System.out.println("value: " + objectName);
-      defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,cluster-connection=\"my-cluster-connection\"");
-
-      assertEquals(defaultValue, objectName);
-
-      objectName = 
ObjectNameBuilder.DEFAULT.getConnectionFactoryObjectName("connectionFactory");
-      System.out.println("value: " + objectName);
-      defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,connection-factory=\"connectionFactory\"");
-
-      assertEquals(defaultValue, objectName);
-
-      objectName = 
ObjectNameBuilder.DEFAULT.getDiscoveryGroupObjectName("my-discovery-group");
-      System.out.println("value: " + objectName);
-      defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,discovery-group=\"my-discovery-group\"");
-
-      assertEquals(defaultValue, objectName);
-
-      objectName = ObjectNameBuilder.DEFAULT.getDivertObjectName("my-divert");
-      System.out.println("value: " + objectName);
-      defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,divert=\"my-divert\"");
-
-      assertEquals(defaultValue, objectName);
-
-      objectName = ObjectNameBuilder.DEFAULT.getHornetQServerObjectName();
-      System.out.println("value: " + objectName);
-      defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default");
-
-      assertEquals(defaultValue, objectName);
-
-      objectName = ObjectNameBuilder.DEFAULT.getJMSTopicObjectName("my-topic");
-      System.out.println("value: " + objectName);
-      defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,jms-topic=\"my-topic\"");
-
-      assertEquals(defaultValue, objectName);
-
-      objectName = ObjectNameBuilder.DEFAULT.getQueueObjectName(new 
SimpleString("some.address"), new SimpleString("some.queue"));
-      System.out.println("value: " + objectName);
-      defaultValue = new 
ObjectName("jboss.as:subsystem=messaging,hornetq-server=default,address=\"some.address\",runtime-queue=\"some.queue\"");
-
-      assertEquals(defaultValue, objectName);
+      super.tearDown();
    }
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementActivationTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementActivationTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementActivationTest.java
index 847f675..c672fbc 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementActivationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementActivationTest.java
@@ -83,7 +83,10 @@ public class ManagementActivationTest extends 
FailoverTestBase
       List<String> connectorNames = new ArrayList<String>();
       connectorNames.add(connectorName);
 
-      ConnectionFactoryConfiguration config = new 
ConnectionFactoryConfigurationImpl("test", false, connectorNames, 
"/myConnectionFactory");
+      ConnectionFactoryConfiguration config = new 
ConnectionFactoryConfigurationImpl()
+         .setName("test")
+         .setConnectorNames(connectorNames)
+         .setBindings("/myConnectionFactory");
       backupJmsServer.createConnectionFactory(true, config, 
"/myConnectionFactory");
 
       boolean exception = false;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
index 085c256..82ec49c 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
@@ -46,8 +46,8 @@ public class ManagementServiceImplTest extends UnitTestCase
       String queue = RandomUtil.randomString();
       String address = RandomUtil.randomString();
 
-      Configuration conf = createBasicConfig();
-      conf.setJMXManagementEnabled(false);
+      Configuration conf = createBasicConfig()
+         .setJMXManagementEnabled(false);
 
       HornetQServer server = HornetQServers.newHornetQServer(conf, false);
       server.start();
@@ -66,8 +66,8 @@ public class ManagementServiceImplTest extends UnitTestCase
    @Test
    public void testHandleManagementMessageWithOperationWhichFails() throws 
Exception
    {
-      Configuration conf = createBasicConfig();
-      conf.setJMXManagementEnabled(false);
+      Configuration conf = createBasicConfig()
+         .setJMXManagementEnabled(false);
 
       HornetQServer server = HornetQServers.newHornetQServer(conf, false);
       server.start();
@@ -86,8 +86,8 @@ public class ManagementServiceImplTest extends UnitTestCase
    @Test
    public void testHandleManagementMessageWithUnknowResource() throws Exception
    {
-      Configuration conf = createBasicConfig();
-      conf.setJMXManagementEnabled(false);
+      Configuration conf = createBasicConfig()
+         .setJMXManagementEnabled(false);
 
       HornetQServer server = HornetQServers.newHornetQServer(conf, false);
       server.start();
@@ -106,8 +106,8 @@ public class ManagementServiceImplTest extends UnitTestCase
    @Test
    public void testHandleManagementMessageWithUnknownAttribute() throws 
Exception
    {
-      Configuration conf = createBasicConfig();
-      conf.setJMXManagementEnabled(false);
+      Configuration conf = createBasicConfig()
+         .setJMXManagementEnabled(false);
 
       HornetQServer server = HornetQServers.newHornetQServer(conf, false);
       server.start();
@@ -127,8 +127,8 @@ public class ManagementServiceImplTest extends UnitTestCase
    @Test
    public void testHandleManagementMessageWithKnownAttribute() throws Exception
    {
-      Configuration conf = createBasicConfig();
-      conf.setJMXManagementEnabled(false);
+      Configuration conf = createBasicConfig()
+         .setJMXManagementEnabled(false);
 
       HornetQServer server = HornetQServers.newHornetQServer(conf, false);
       server.start();
@@ -148,8 +148,8 @@ public class ManagementServiceImplTest extends UnitTestCase
    @Test
    public void testGetResources() throws Exception
    {
-      Configuration conf = createBasicConfig();
-      conf.setJMXManagementEnabled(false);
+      Configuration conf = createBasicConfig()
+         .setJMXManagementEnabled(false);
       ManagementServiceImpl managementService = new 
ManagementServiceImpl(null, conf);
       managementService.setStorageManager(new NullStorageManager());
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementTestBase.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementTestBase.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementTestBase.java
index 2a4a762..5d2452a 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementTestBase.java
@@ -12,6 +12,7 @@
  */
 package org.hornetq.tests.integration.management;
 import org.hornetq.api.core.management.QueueControl;
+import org.hornetq.api.jms.management.JMSQueueControl;
 import org.junit.Before;
 import org.junit.After;
 
@@ -121,6 +122,29 @@ public abstract class ManagementTestBase extends 
ServiceTestBase
       return queueControl;
    }
 
+   protected long getMessageCount(JMSQueueControl control) throws Exception
+   {
+      control.flushExecutor();
+      return control.getMessageCount();
+   }
+
+   protected long getMessagesAdded(JMSQueueControl control) throws Exception
+   {
+      control.flushExecutor();
+      return control.getMessagesAdded();
+   }
+
+   protected long getMessageCount(QueueControl control) throws Exception
+   {
+      control.flushExecutor();
+      return control.getMessageCount();
+   }
+
+   protected long getMessagesAdded(QueueControl control) throws Exception
+   {
+      control.flushExecutor();
+      return control.getMessagesAdded();
+   }
 
    // Private -------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithPagingServerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithPagingServerTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithPagingServerTest.java
index 9ecff90..53ea2bf 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithPagingServerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithPagingServerTest.java
@@ -198,11 +198,8 @@ public class ManagementWithPagingServerTest extends 
ManagementTestBase
    {
       super.setUp();
 
-      Configuration conf = createBasicConfig();
-      conf.setSecurityEnabled(false);
-      conf.setJMXManagementEnabled(true);
-
-      conf.getAcceptorConfigurations().add(new 
TransportConfiguration(INVM_ACCEPTOR_FACTORY));
+      Configuration conf = createBasicConfig()
+         .addAcceptorConfiguration(new 
TransportConfiguration(INVM_ACCEPTOR_FACTORY));
 
       server = addServer(HornetQServers.newHornetQServer(conf, mbeanServer, 
true));
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithStompTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithStompTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithStompTest.java
index e2694a9..2bf9096 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithStompTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/ManagementWithStompTest.java
@@ -171,15 +171,15 @@ public class ManagementWithStompTest extends 
ManagementTestBase
    {
       super.setUp();
 
-      Configuration conf = createBasicConfig();
-      conf.setSecurityEnabled(false);
-      conf.setJMXManagementEnabled(true);
       Map<String, Object> params = new HashMap<String, Object>();
-      params.put(TransportConstants.PROTOCOL_PROP_NAME, 
StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
+      params.put(TransportConstants.PROTOCOLS_PROP_NAME, 
StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
       params.put(TransportConstants.PORT_PROP_NAME, 
TransportConstants.DEFAULT_STOMP_PORT);
       TransportConfiguration stompTransport = new 
TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
-      conf.getAcceptorConfigurations().add(stompTransport);
-      conf.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+
+      Configuration conf = createBasicConfig()
+         .addAcceptorConfiguration(stompTransport)
+         .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+
       server = HornetQServers.newHornetQServer(conf, mbeanServer, false, 
"brianm", "wombats");
 
       server.start();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/NotificationTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/NotificationTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/NotificationTest.java
index 4c20a89..d0f7a8f 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/NotificationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/NotificationTest.java
@@ -36,10 +36,10 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.hornetq.api.core.management.NotificationType.BINDING_ADDED;
-import static org.hornetq.api.core.management.NotificationType.BINDING_REMOVED;
-import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED;
-import static 
org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
+import static 
org.hornetq.api.core.management.CoreNotificationType.BINDING_ADDED;
+import static 
org.hornetq.api.core.management.CoreNotificationType.BINDING_REMOVED;
+import static 
org.hornetq.api.core.management.CoreNotificationType.CONSUMER_CLOSED;
+import static 
org.hornetq.api.core.management.CoreNotificationType.CONSUMER_CREATED;
 
 /**
  * A NotificationTest
@@ -252,11 +252,9 @@ public class NotificationTest extends UnitTestCase
    {
       super.setUp();
 
-      Configuration conf = createBasicConfig();
-      conf.setSecurityEnabled(false);
-      // the notifications are independent of JMX
-      conf.setJMXManagementEnabled(false);
-      conf.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      Configuration conf = createBasicConfig()
+         // the notifications are independent of JMX
+         .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
       service = HornetQServers.newHornetQServer(conf, false);
       service.start();
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlTest.java
index c05fff5..31bea0f 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlTest.java
@@ -17,6 +17,8 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.management.Notification;
+
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
@@ -29,16 +31,20 @@ import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.DayCounterInfo;
 import org.hornetq.api.core.management.HornetQServerControl;
 import org.hornetq.api.core.management.MessageCounterInfo;
+import org.hornetq.api.core.management.ObjectNameBuilder;
 import org.hornetq.api.core.management.QueueControl;
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServers;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.integration.jms.server.management.JMSUtil;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.utils.json.JSONArray;
 import org.junit.After;
@@ -247,15 +253,15 @@ public class QueueControlTest extends ManagementTestBase
       session.createQueue(address, queue, null, false);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(0, queueControl.getMessageCount());
+      Assert.assertEquals(0, getMessageCount(queueControl));
 
       ClientProducer producer = session.createProducer(address);
       producer.send(session.createMessage(false));
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       ManagementTestBase.consumeMessages(1, session, queue);
 
-      Assert.assertEquals(0, queueControl.getMessageCount());
+      Assert.assertEquals(0, getMessageCount(queueControl));
 
       session.deleteQueue(queue);
    }
@@ -269,7 +275,7 @@ public class QueueControlTest extends ManagementTestBase
       session.createQueue(address, queue, null, false);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(0, queueControl.getMessageCount());
+      Assert.assertEquals(0, getMessageCount(queueControl));
 
       // It's empty, so it's supposed to be like this
       assertEquals("[{}]", queueControl.getFirstMessageAsJSON());
@@ -291,17 +297,43 @@ public class QueueControlTest extends ManagementTestBase
       session.createQueue(address, queue, null, false);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(0, queueControl.getMessagesAdded());
+      Assert.assertEquals(0, getMessagesAdded(queueControl));
 
       ClientProducer producer = session.createProducer(address);
       producer.send(session.createMessage(false));
-      Assert.assertEquals(1, queueControl.getMessagesAdded());
+      Assert.assertEquals(1, getMessagesAdded(queueControl));
       producer.send(session.createMessage(false));
-      Assert.assertEquals(2, queueControl.getMessagesAdded());
+      Assert.assertEquals(2, getMessagesAdded(queueControl));
 
       ManagementTestBase.consumeMessages(2, session, queue);
 
-      Assert.assertEquals(2, queueControl.getMessagesAdded());
+      Assert.assertEquals(2, getMessagesAdded(queueControl));
+
+      session.deleteQueue(queue);
+   }
+
+   @Test
+   public void testGetMessagesAcknowledged() throws Exception
+   {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+
+      ClientProducer producer = session.createProducer(address);
+      producer.send(session.createMessage(false));
+      ManagementTestBase.consumeMessages(1, session, queue);
+      Assert.assertEquals(1, queueControl.getMessagesAcknowledged());
+      producer.send(session.createMessage(false));
+      ManagementTestBase.consumeMessages(1, session, queue);
+      Assert.assertEquals(2, queueControl.getMessagesAcknowledged());
+
+//      ManagementTestBase.consumeMessages(2, session, queue);
+
+//      Assert.assertEquals(2, getMessagesAdded(queueControl));
 
       session.deleteQueue(queue);
    }
@@ -771,12 +803,12 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(message);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       // moved all messages to otherQueue
       int movedMessagesCount = queueControl.moveMessages(null, 
otherQueue.toString());
       Assert.assertEquals(1, movedMessagesCount);
-      Assert.assertEquals(0, queueControl.getMessageCount());
+      Assert.assertEquals(0, getMessageCount(queueControl));
 
       // check there is no message to consume from queue
       ManagementTestBase.consumeMessages(0, session, queue);
@@ -811,7 +843,7 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(message);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       // moved all messages to unknown queue
       try
@@ -822,7 +854,7 @@ public class QueueControlTest extends ManagementTestBase
       catch (Exception e)
       {
       }
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       ManagementTestBase.consumeMessages(1, session, queue);
 
@@ -863,12 +895,12 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(unmatchingMessage);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, queueControl.getMessageCount());
+      Assert.assertEquals(2, getMessageCount(queueControl));
 
       // moved matching messages to otherQueue
       int movedMatchedMessagesCount = queueControl.moveMessages(key + " =" + 
matchingValue, otherQueue.toString());
       Assert.assertEquals(1, movedMatchedMessagesCount);
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       // consume the unmatched message from queue
       ClientConsumer consumer = session.createConsumer(queue);
@@ -908,8 +940,8 @@ public class QueueControlTest extends ManagementTestBase
 
       QueueControl queueControl = createManagementControl(address, queue);
       QueueControl otherQueueControl = createManagementControl(otherAddress, 
otherQueue);
-      Assert.assertEquals(2, queueControl.getMessageCount());
-      Assert.assertEquals(0, otherQueueControl.getMessageCount());
+      Assert.assertEquals(2, getMessageCount(queueControl));
+      Assert.assertEquals(0, getMessageCount(otherQueueControl));
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -918,8 +950,8 @@ public class QueueControlTest extends ManagementTestBase
 
       boolean moved = queueControl.moveMessage(messageID, 
otherQueue.toString());
       Assert.assertTrue(moved);
-      Assert.assertEquals(1, queueControl.getMessageCount());
-      Assert.assertEquals(1, otherQueueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
+      Assert.assertEquals(1, getMessageCount(otherQueueControl));
 
       ManagementTestBase.consumeMessages(1, session, queue);
       ManagementTestBase.consumeMessages(1, session, otherQueue);
@@ -942,7 +974,7 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(session.createMessage(false));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -958,7 +990,7 @@ public class QueueControlTest extends ManagementTestBase
       catch (Exception e)
       {
       }
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       ManagementTestBase.consumeMessages(1, session, queue);
 
@@ -995,12 +1027,12 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(unmatchingMessage);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, queueControl.getMessageCount());
+      Assert.assertEquals(2, getMessageCount(queueControl));
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeMessages(key + " =" 
+ matchingValue);
       Assert.assertEquals(1, removedMatchedMessagesCount);
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       // consume the unmatched message from queue
       ClientConsumer consumer = session.createConsumer(queue);
@@ -1040,12 +1072,12 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(unmatchingMessage);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, queueControl.getMessageCount());
+      Assert.assertEquals(2, getMessageCount(queueControl));
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeMessages(5, key + " 
=" + matchingValue);
       Assert.assertEquals(1, removedMatchedMessagesCount);
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       // consume the unmatched message from queue
       ClientConsumer consumer = session.createConsumer(queue);
@@ -1077,12 +1109,12 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(session.createMessage(false));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, queueControl.getMessageCount());
+      Assert.assertEquals(2, getMessageCount(queueControl));
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeMessages(null);
       Assert.assertEquals(2, removedMatchedMessagesCount);
-      Assert.assertEquals(0, queueControl.getMessageCount());
+      Assert.assertEquals(0, getMessageCount(queueControl));
 
       session.deleteQueue(queue);
    }
@@ -1101,12 +1133,12 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(session.createMessage(false));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, queueControl.getMessageCount());
+      Assert.assertEquals(2, getMessageCount(queueControl));
 
       // removed matching messages to otherQueue
       int removedMatchedMessagesCount = queueControl.removeMessages("");
       Assert.assertEquals(2, removedMatchedMessagesCount);
-      Assert.assertEquals(0, queueControl.getMessageCount());
+      Assert.assertEquals(0, getMessageCount(queueControl));
 
       session.deleteQueue(queue);
    }
@@ -1125,7 +1157,7 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(session.createMessage(false));
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, queueControl.getMessageCount());
+      Assert.assertEquals(2, getMessageCount(queueControl));
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1135,7 +1167,7 @@ public class QueueControlTest extends ManagementTestBase
       // delete 1st message
       boolean deleted = queueControl.removeMessage(messageID);
       Assert.assertTrue(deleted);
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       // check there is a single message to consume from queue
       ManagementTestBase.consumeMessages(1, session, queue);
@@ -1144,6 +1176,48 @@ public class QueueControlTest extends ManagementTestBase
    }
 
    @Test
+   public void testRemoveScheduledMessage() throws Exception
+   {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+      ClientProducer producer = session.createProducer(address);
+
+      // send 2 messages on queue, both scheduled
+      long timeout = System.currentTimeMillis() + 5000;
+      ClientMessage m1 = session.createMessage(true);
+      m1.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, timeout);
+      producer.send(m1);
+      ClientMessage m2 = session.createMessage(true);
+      m2.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, timeout);
+      producer.send(m2);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      Assert.assertEquals(2, queueControl.getScheduledCount());
+
+      // the message IDs are set on the server
+      Map<String, Object>[] messages = queueControl.listScheduledMessages();
+      Assert.assertEquals(2, messages.length);
+      long messageID = (Long) messages[0].get("messageID");
+
+      // delete 1st message
+      boolean deleted = queueControl.removeMessage(messageID);
+      Assert.assertTrue(deleted);
+      Assert.assertEquals(1, queueControl.getScheduledCount());
+
+      // check there is a single message to consume from queue
+      while (timeout > System.currentTimeMillis() && 
queueControl.getScheduledCount() == 1)
+      {
+         Thread.sleep(100);
+      }
+
+      ManagementTestBase.consumeMessages(1, session, queue);
+
+      session.deleteQueue(queue);
+   }
+
+   @Test
    public void testRemoveMessage2() throws Exception
    {
       SimpleString address = RandomUtil.randomSimpleString();
@@ -1172,7 +1246,7 @@ public class QueueControlTest extends ManagementTestBase
       }
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(100, queueControl.getMessageCount());
+      Assert.assertEquals(100, getMessageCount(queueControl));
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1183,7 +1257,7 @@ public class QueueControlTest extends ManagementTestBase
       // delete 1st message
       boolean deleted = queueControl.removeMessage(messageID);
       Assert.assertTrue(deleted);
-      Assert.assertEquals(99, queueControl.getMessageCount());
+      Assert.assertEquals(99, getMessageCount(queueControl));
 
       cons.close();
 
@@ -1216,7 +1290,7 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(matchingMessage);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(3, queueControl.getMessageCount());
+      Assert.assertEquals(3, getMessageCount(queueControl));
 
       Assert.assertEquals(2, queueControl.countMessages(key + " =" + 
matchingValue));
       Assert.assertEquals(1, queueControl.countMessages(key + " =" + 
unmatchingValue));
@@ -1264,7 +1338,7 @@ public class QueueControlTest extends ManagementTestBase
 
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(110, queueControl.getMessageCount());
+      Assert.assertEquals(110, getMessageCount(queueControl));
 
 
       Assert.assertEquals(0, queueControl.countMessages("nonExistentProperty 
like \'%Temp/88\'"));
@@ -1299,11 +1373,11 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(unmatchingMessage);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(2, queueControl.getMessageCount());
+      Assert.assertEquals(2, getMessageCount(queueControl));
 
       int expiredMessagesCount = queueControl.expireMessages(key + " =" + 
matchingValue);
       Assert.assertEquals(1, expiredMessagesCount);
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       // consume the unmatched message from queue
       ClientConsumer consumer = session.createConsumer(queue);
@@ -1339,8 +1413,8 @@ public class QueueControlTest extends ManagementTestBase
 
       QueueControl queueControl = createManagementControl(address, queue);
       QueueControl expiryQueueControl = createManagementControl(expiryAddress, 
expiryQueue);
-      Assert.assertEquals(1, queueControl.getMessageCount());
-      Assert.assertEquals(0, expiryQueueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
+      Assert.assertEquals(0, getMessageCount(expiryQueueControl));
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1350,8 +1424,8 @@ public class QueueControlTest extends ManagementTestBase
       queueControl.setExpiryAddress(expiryAddress.toString());
       boolean expired = queueControl.expireMessage(messageID);
       Assert.assertTrue(expired);
-      Assert.assertEquals(0, queueControl.getMessageCount());
-      Assert.assertEquals(1, expiryQueueControl.getMessageCount());
+      Assert.assertEquals(0, getMessageCount(queueControl));
+      Assert.assertEquals(1, getMessageCount(expiryQueueControl));
 
       ManagementTestBase.consumeMessages(0, session, queue);
       ManagementTestBase.consumeMessages(1, session, expiryQueue);
@@ -1379,7 +1453,7 @@ public class QueueControlTest extends ManagementTestBase
 
       QueueControl queueControl = createManagementControl(address, queue);
       QueueControl deadLetterQueueControl = 
createManagementControl(deadLetterAddress, deadLetterQueue);
-      Assert.assertEquals(2, queueControl.getMessageCount());
+      Assert.assertEquals(2, getMessageCount(queueControl));
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1388,11 +1462,11 @@ public class QueueControlTest extends ManagementTestBase
 
       queueControl.setDeadLetterAddress(deadLetterAddress.toString());
 
-      Assert.assertEquals(0, deadLetterQueueControl.getMessageCount());
+      Assert.assertEquals(0, getMessageCount(deadLetterQueueControl));
       boolean movedToDeadLetterAddress = 
queueControl.sendMessageToDeadLetterAddress(messageID);
       Assert.assertTrue(movedToDeadLetterAddress);
-      Assert.assertEquals(1, queueControl.getMessageCount());
-      Assert.assertEquals(1, deadLetterQueueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
+      Assert.assertEquals(1, getMessageCount(deadLetterQueueControl));
 
       // check there is a single message to consume from queue
       ManagementTestBase.consumeMessages(1, session, queue);
@@ -1421,7 +1495,7 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(message);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1455,7 +1529,7 @@ public class QueueControlTest extends ManagementTestBase
       producer.send(message);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(1, queueControl.getMessageCount());
+      Assert.assertEquals(1, getMessageCount(queueControl));
 
       // the message IDs are set on the server
       Map<String, Object>[] messages = queueControl.listMessages(null);
@@ -1834,25 +1908,89 @@ public class QueueControlTest extends ManagementTestBase
       session.createQueue(address, queue, null, false);
 
       QueueControl queueControl = createManagementControl(address, queue);
-      Assert.assertEquals(0, queueControl.getMessagesAdded());
+      Assert.assertEquals(0, getMessagesAdded(queueControl));
 
       ClientProducer producer = session.createProducer(address);
       producer.send(session.createMessage(false));
-      Assert.assertEquals(1, queueControl.getMessagesAdded());
+      Assert.assertEquals(1, getMessagesAdded(queueControl));
       producer.send(session.createMessage(false));
-      Assert.assertEquals(2, queueControl.getMessagesAdded());
+      Assert.assertEquals(2, getMessagesAdded(queueControl));
 
       ManagementTestBase.consumeMessages(2, session, queue);
 
-      Assert.assertEquals(2, queueControl.getMessagesAdded());
+      Assert.assertEquals(2, getMessagesAdded(queueControl));
 
       queueControl.resetMessagesAdded();
 
-      Assert.assertEquals(0, queueControl.getMessagesAdded());
+      Assert.assertEquals(0, getMessagesAdded(queueControl));
 
       session.deleteQueue(queue);
    }
 
+   @Test
+   public void testResetMessagesAcknowledged() throws Exception
+   {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+
+      ClientProducer producer = session.createProducer(address);
+      producer.send(session.createMessage(false));
+      ManagementTestBase.consumeMessages(1, session, queue);
+      Assert.assertEquals(1, queueControl.getMessagesAcknowledged());
+      producer.send(session.createMessage(false));
+      ManagementTestBase.consumeMessages(1, session, queue);
+      Assert.assertEquals(2, queueControl.getMessagesAcknowledged());
+
+      queueControl.resetMessagesAcknowledged();
+
+      Assert.assertEquals(0, queueControl.getMessagesAcknowledged());
+
+      session.deleteQueue(queue);
+   }
+
+   //make sure notifications are always received no matter whether
+   //a Queue is created via QueueControl or by JMSServerManager directly.
+   @Test
+   public void testCreateQueueNotification() throws Exception
+   {
+      JMSUtil.JMXListener listener = new JMSUtil.JMXListener();
+      
this.mbeanServer.addNotificationListener(ObjectNameBuilder.DEFAULT.getHornetQServerObjectName(),
 listener, null, null);
+
+      SimpleString testQueueName = new SimpleString("newQueue");
+      String testQueueName2 = "newQueue2";
+      this.server.createQueue(testQueueName,  testQueueName,  null,  false,  
false);
+
+      Notification notif = listener.getNotification();
+
+      System.out.println("got notif: " + notif);
+      assertEquals(CoreNotificationType.BINDING_ADDED.toString(), 
notif.getType());
+
+      this.server.destroyQueue(testQueueName);
+
+      notif = listener.getNotification();
+      System.out.println("got notif: " + notif);
+      assertEquals(CoreNotificationType.BINDING_REMOVED.toString(), 
notif.getType());
+
+      HornetQServerControl control = 
ManagementControlHelper.createHornetQServerControl(mbeanServer);
+
+      control.createQueue(testQueueName2, testQueueName2);
+
+      notif = listener.getNotification();
+      System.out.println("got notif: " + notif);
+      assertEquals(CoreNotificationType.BINDING_ADDED.toString(), 
notif.getType());
+
+      control.destroyQueue(testQueueName2);
+
+      notif = listener.getNotification();
+      System.out.println("got notif: " + notif);
+      assertEquals(CoreNotificationType.BINDING_REMOVED.toString(), 
notif.getType());
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -1863,10 +2001,8 @@ public class QueueControlTest extends ManagementTestBase
    {
       super.setUp();
 
-      Configuration conf = createBasicConfig();
-      conf.setSecurityEnabled(false);
-      conf.setJMXManagementEnabled(true);
-      conf.getAcceptorConfigurations().add(new 
TransportConfiguration(INVM_ACCEPTOR_FACTORY));
+      Configuration conf = createBasicConfig()
+         .addAcceptorConfiguration(new 
TransportConfiguration(INVM_ACCEPTOR_FACTORY));
       server = addServer(HornetQServers.newHornetQServer(conf, mbeanServer, 
false));
       server.start();
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
index 852da9f..4d6629b 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
@@ -41,6 +41,19 @@ public class QueueControlUsingCoreTest extends 
QueueControlTest
       {
          private final CoreMessagingProxy proxy = new 
CoreMessagingProxy(session, ResourceNames.CORE_QUEUE + queue);
 
+         @Override
+         public void flushExecutor()
+         {
+            try
+            {
+               proxy.invokeOperation("flushExecutor");
+            }
+            catch (Exception e)
+            {
+               throw new RuntimeException(e.getMessage(), e);
+            }
+         }
+
          public boolean changeMessagePriority(final long messageID, final int 
newPriority) throws Exception
          {
             return (Boolean) proxy.invokeOperation("changeMessagePriority", 
messageID, newPriority);
@@ -106,11 +119,21 @@ public class QueueControlUsingCoreTest extends 
QueueControlTest
             return (Integer) proxy.retrieveAttributeValue("messagesAdded");
          }
 
+         public long getMessagesAcknowledged()
+         {
+            return (Integer) 
proxy.retrieveAttributeValue("messagesAcknowledged");
+         }
+
          public void resetMessagesAdded() throws Exception
          {
             proxy.invokeOperation("resetMessagesAdded");
          }
 
+         public void resetMessagesAcknowledged() throws Exception
+         {
+            proxy.invokeOperation("resetMessagesAcknowledged");
+         }
+
          public String getName()
          {
             return (String) proxy.retrieveAttributeValue("name");

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java
index b0ba06e..19efdaf 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithConfiguredAdminUserTest.java
@@ -88,9 +88,9 @@ public class SecurityManagementWithConfiguredAdminUserTest 
extends SecurityManag
    @Override
    protected HornetQServer setupAndStartHornetQServer() throws Exception
    {
-      Configuration conf = createBasicConfig();
-      conf.setSecurityEnabled(true);
-      conf.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      Configuration conf = createBasicConfig()
+         .setSecurityEnabled(true)
+         .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
       HornetQServer server = addServer(HornetQServers.newHornetQServer(conf, 
false));
       server.start();
       HierarchicalRepository<Set<Role>> securityRepository = 
server.getSecurityRepository();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
index 2015d27..36a0889 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithDefaultConfigurationTest.java
@@ -56,10 +56,10 @@ public class SecurityManagementWithDefaultConfigurationTest 
extends SecurityMana
    @Override
    protected HornetQServer setupAndStartHornetQServer() throws Exception
    {
-      Configuration conf = createBasicConfig();
-      
conf.setClusterPassword(HornetQDefaultConfiguration.getDefaultClusterPassword());
-      conf.setSecurityEnabled(true);
-      conf.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      Configuration conf = createBasicConfig()
+         
.setClusterPassword(HornetQDefaultConfiguration.getDefaultClusterPassword())
+         .setSecurityEnabled(true)
+         .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
       HornetQServer server = addServer(HornetQServers.newHornetQServer(conf, 
false));
       server.start();
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithModifiedConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithModifiedConfigurationTest.java
 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithModifiedConfigurationTest.java
index c6e6422..dbb0a36 100644
--- 
a/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithModifiedConfigurationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/SecurityManagementWithModifiedConfigurationTest.java
@@ -74,10 +74,10 @@ public class 
SecurityManagementWithModifiedConfigurationTest extends SecurityMan
    @Override
    protected HornetQServer setupAndStartHornetQServer() throws Exception
    {
-      ConfigurationImpl conf = createBasicConfig();
-      conf.setSecurityEnabled(true);
-      conf.setClusterPassword(configuredClusterPassword);
-      conf.getAcceptorConfigurations().add(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      ConfigurationImpl conf = createBasicConfig()
+         .setSecurityEnabled(true)
+         .setClusterPassword(configuredClusterPassword)
+         .addAcceptorConfiguration(new 
TransportConfiguration(InVMAcceptorFactory.class.getName()));
       HornetQServer server = HornetQServers.newHornetQServer(conf, false);
       server.start();
 

Reply via email to