http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
 
b/hornetq-server/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
index ae5b41f..bed8d0a 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/deployers/impl/FileConfigurationParser.java
@@ -32,13 +32,20 @@ import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.UDPBroadcastGroupConfiguration;
 import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.config.BackupStrategy;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.ConnectorServiceConfiguration;
 import org.hornetq.core.config.CoreQueueConfiguration;
 import org.hornetq.core.config.DivertConfiguration;
+import org.hornetq.core.config.HAPolicyConfiguration;
+import org.hornetq.core.config.ScaleDownConfiguration;
+import org.hornetq.core.config.ha.ColocatedPolicyConfiguration;
+import org.hornetq.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.hornetq.core.config.ha.ReplicaPolicyConfiguration;
+import org.hornetq.core.config.ha.ReplicatedPolicyConfiguration;
+import org.hornetq.core.config.ha.SharedStoreMasterPolicyConfiguration;
+import org.hornetq.core.config.ha.SharedStoreSlavePolicyConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.config.impl.FileConfiguration;
 import org.hornetq.core.config.impl.Validators;
@@ -47,11 +54,10 @@ import org.hornetq.core.journal.impl.JournalConstants;
 import org.hornetq.core.security.Role;
 import org.hornetq.core.server.HornetQServerLogger;
 import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.cluster.ha.HAPolicy;
-import org.hornetq.core.server.cluster.ha.HAPolicyTemplate;
 import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.settings.impl.SlowConsumerPolicy;
 import org.hornetq.utils.DefaultSensitiveStringCodec;
 import org.hornetq.utils.PasswordMaskingUtil;
 import org.hornetq.utils.SensitiveDataCodec;
@@ -136,6 +142,12 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
 
    private static final String SEND_TO_DLA_ON_NO_ROUTE = 
"send-to-dla-on-no-route";
 
+   private static final String SLOW_CONSUMER_THRESHOLD_NODE_NAME = 
"slow-consumer-threshold";
+
+   private static final String SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME = 
"slow-consumer-check-period";
+
+   private static final String SLOW_CONSUMER_POLICY_NODE_NAME = 
"slow-consumer-policy";
+
    // Attributes ----------------------------------------------------
 
    private boolean validateAIO = false;
@@ -195,6 +207,57 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          HornetQServerLogger.LOGGER.deprecatedConfigurationOption("clustered");
       }
 
+      // these are combined because they are both required for setting the 
correct HAPolicyConfiguration
+      if (parameterExists(e, "backup") || parameterExists(e, "shared-store"))
+      {
+         boolean backup = getBoolean(e, "backup", false);
+         boolean sharedStore = getBoolean(e, "shared-store", true);
+
+         if (containsHAPolicy)
+         {
+            if (parameterExists(e, "backup"))
+            {
+               HornetQServerLogger.LOGGER.incompatibleWithHAPolicy("backup");
+            }
+
+            if (parameterExists(e, "shared-store"))
+            {
+               
HornetQServerLogger.LOGGER.incompatibleWithHAPolicy("shared-store");
+            }
+         }
+         else
+         {
+            if (parameterExists(e, "backup"))
+            {
+               
HornetQServerLogger.LOGGER.deprecatedConfigurationOption("backup");
+            }
+
+            if (parameterExists(e, "shared-store"))
+            {
+               
HornetQServerLogger.LOGGER.deprecatedConfigurationOption("shared-store");
+            }
+
+            if (backup && sharedStore)
+            {
+               config.setHAPolicyConfiguration(new 
SharedStoreSlavePolicyConfiguration());
+            }
+            else if (backup && !sharedStore)
+            {
+               config.setHAPolicyConfiguration(new 
ReplicaPolicyConfiguration());
+            }
+            else if (!backup && sharedStore)
+            {
+               config.setHAPolicyConfiguration(new 
SharedStoreMasterPolicyConfiguration());
+            }
+            else if (!backup && !sharedStore)
+            {
+               config.setHAPolicyConfiguration(new 
ReplicatedPolicyConfiguration());
+            }
+         }
+      }
+
+      HAPolicyConfiguration haPolicyConfig = config.getHAPolicyConfiguration();
+
       if (parameterExists(e, "check-for-live-server"))
       {
          if (containsHAPolicy)
@@ -205,7 +268,11 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          {
             
HornetQServerLogger.LOGGER.deprecatedConfigurationOption("check-for-live-server");
 
-            config.getHAPolicy().setCheckForLiveServer(getBoolean(e, 
"check-for-live-server", config.getHAPolicy().isCheckForLiveServer()));
+            if (haPolicyConfig instanceof ReplicatedPolicyConfiguration)
+            {
+               ReplicatedPolicyConfiguration hapc = 
(ReplicatedPolicyConfiguration) haPolicyConfig;
+               hapc.setCheckForLiveServer(getBoolean(e, 
"check-for-live-server", hapc.isCheckForLiveServer()));
+            }
          }
       }
 
@@ -219,7 +286,20 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          {
             
HornetQServerLogger.LOGGER.deprecatedConfigurationOption("allow-failback");
 
-            config.getHAPolicy().setAllowAutoFailBack(getBoolean(e, 
"allow-failback", config.getHAPolicy().isAllowAutoFailBack()));
+            if (haPolicyConfig instanceof ReplicaPolicyConfiguration)
+            {
+               ReplicaPolicyConfiguration hapc = (ReplicaPolicyConfiguration) 
haPolicyConfig;
+               hapc.setAllowFailBack(getBoolean(e, "allow-failback", 
hapc.isAllowFailBack()));
+            }
+            else if (haPolicyConfig instanceof 
SharedStoreSlavePolicyConfiguration)
+            {
+               SharedStoreSlavePolicyConfiguration hapc = 
(SharedStoreSlavePolicyConfiguration) haPolicyConfig;
+               hapc.setAllowFailBack(getBoolean(e, "allow-failback", 
hapc.isAllowFailBack()));
+            }
+            else
+            {
+               
HornetQServerLogger.LOGGER.incompatibleWithHAPolicyChosen("check-for-live-server");
+            }
          }
       }
 
@@ -233,7 +313,20 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          {
             
HornetQServerLogger.LOGGER.deprecatedConfigurationOption("backup-group-name");
 
-            config.getHAPolicy().setBackupGroupName(getString(e, 
"backup-group-name", config.getHAPolicy().getBackupGroupName(), 
Validators.NO_CHECK));
+            if (haPolicyConfig instanceof ReplicaPolicyConfiguration)
+            {
+               ReplicaPolicyConfiguration hapc = (ReplicaPolicyConfiguration) 
haPolicyConfig;
+               hapc.setGroupName(getString(e, "backup-group-name", 
hapc.getGroupName(), Validators.NO_CHECK));
+            }
+            else if (haPolicyConfig instanceof ReplicatedPolicyConfiguration)
+            {
+               ReplicatedPolicyConfiguration hapc = 
(ReplicatedPolicyConfiguration) haPolicyConfig;
+               hapc.setGroupName(getString(e, "backup-group-name", 
hapc.getGroupName(), Validators.NO_CHECK));
+            }
+            else
+            {
+               
HornetQServerLogger.LOGGER.incompatibleWithHAPolicyChosen("backup-group-name");
+            }
          }
       }
 
@@ -247,7 +340,25 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          {
             
HornetQServerLogger.LOGGER.deprecatedConfigurationOption("failback-delay");
 
-            config.getHAPolicy().setFailbackDelay(getLong(e, "failback-delay", 
config.getHAPolicy().getFailbackDelay(), Validators.GT_ZERO));
+            if (haPolicyConfig instanceof ReplicaPolicyConfiguration)
+            {
+               ReplicaPolicyConfiguration hapc = (ReplicaPolicyConfiguration) 
haPolicyConfig;
+               hapc.setFailbackDelay(getLong(e, "failback-delay", 
hapc.getFailbackDelay(), Validators.GT_ZERO));
+            }
+            else if (haPolicyConfig instanceof 
SharedStoreMasterPolicyConfiguration)
+            {
+               SharedStoreMasterPolicyConfiguration hapc = 
(SharedStoreMasterPolicyConfiguration) haPolicyConfig;
+               hapc.setFailbackDelay(getLong(e, "failback-delay", 
hapc.getFailbackDelay(), Validators.GT_ZERO));
+            }
+            else if (haPolicyConfig instanceof 
SharedStoreSlavePolicyConfiguration)
+            {
+               SharedStoreSlavePolicyConfiguration hapc = 
(SharedStoreSlavePolicyConfiguration) haPolicyConfig;
+               hapc.setFailbackDelay(getLong(e, "failback-delay", 
hapc.getFailbackDelay(), Validators.GT_ZERO));
+            }
+            else
+            {
+               
HornetQServerLogger.LOGGER.incompatibleWithHAPolicyChosen("failback-delay");
+            }
          }
       }
 
@@ -261,7 +372,20 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          {
             
HornetQServerLogger.LOGGER.deprecatedConfigurationOption("failover-on-shutdown");
 
-            config.getHAPolicy().setFailoverOnServerShutdown(getBoolean(e, 
"failover-on-shutdown", config.getHAPolicy().isFailoverOnServerShutdown()));
+            if (haPolicyConfig instanceof SharedStoreMasterPolicyConfiguration)
+            {
+               SharedStoreMasterPolicyConfiguration hapc = 
(SharedStoreMasterPolicyConfiguration) haPolicyConfig;
+               hapc.setFailoverOnServerShutdown(getBoolean(e, 
"failover-on-shutdown", hapc.isFailoverOnServerShutdown()));
+            }
+            else if (haPolicyConfig instanceof 
SharedStoreSlavePolicyConfiguration)
+            {
+               SharedStoreSlavePolicyConfiguration hapc = 
(SharedStoreSlavePolicyConfiguration) haPolicyConfig;
+               hapc.setFailoverOnServerShutdown(getBoolean(e, 
"failover-on-shutdown", hapc.isFailoverOnServerShutdown()));
+            }
+            else
+            {
+               
HornetQServerLogger.LOGGER.incompatibleWithHAPolicyChosen("failover-on-shutdown");
+            }
          }
       }
 
@@ -275,21 +399,20 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          {
             
HornetQServerLogger.LOGGER.deprecatedConfigurationOption("replication-clustername");
 
-            config.getHAPolicy().setReplicationClustername(getString(e, 
"replication-clustername", null, Validators.NO_CHECK));
-         }
-      }
-
-      if (parameterExists(e, "scale-down-clustername"))
-      {
-         if (containsHAPolicy)
-         {
-            
HornetQServerLogger.LOGGER.incompatibleWithHAPolicy("scale-down-clustername");
-         }
-         else
-         {
-            
HornetQServerLogger.LOGGER.deprecatedConfigurationOption("scale-down-clustername");
-
-            config.getHAPolicy().setScaleDownClustername(getString(e, 
"scale-down-clustername", null, Validators.NO_CHECK));
+            if (haPolicyConfig instanceof ReplicaPolicyConfiguration)
+            {
+               ReplicaPolicyConfiguration hapc = (ReplicaPolicyConfiguration) 
haPolicyConfig;
+               hapc.setClusterName(getString(e, "replication-clustername", 
null, Validators.NO_CHECK));
+            }
+            else if (haPolicyConfig instanceof ReplicatedPolicyConfiguration)
+            {
+               ReplicatedPolicyConfiguration hapc = 
(ReplicatedPolicyConfiguration) haPolicyConfig;
+               hapc.setClusterName(getString(e, "replication-clustername", 
null, Validators.NO_CHECK));
+            }
+            else
+            {
+               
HornetQServerLogger.LOGGER.incompatibleWithHAPolicyChosen("replication-clustername");
+            }
          }
       }
 
@@ -303,60 +426,27 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          {
             
HornetQServerLogger.LOGGER.deprecatedConfigurationOption("max-saved-replicated-journals-size");
 
-            
config.getHAPolicy().setMaxSavedReplicatedJournalSize(getInteger(e, 
"max-saved-replicated-journals-size",
-                                                                             
config.getHAPolicy().getMaxSavedReplicatedJournalsSize(), 
Validators.MINUS_ONE_OR_GE_ZERO));
-         }
-      }
-
-      // these are combined because they are both required for setting the 
HAPolicy
-      if (parameterExists(e, "backup") || parameterExists(e, "shared-store"))
-      {
-         boolean backup = getBoolean(e, "backup", 
config.getHAPolicy().isBackup());
-         boolean sharedStore = getBoolean(e, "shared-store", 
config.getHAPolicy().isSharedStore());
-
-         if (containsHAPolicy)
-         {
-            if (parameterExists(e, "backup"))
-            {
-               HornetQServerLogger.LOGGER.incompatibleWithHAPolicy("backup");
-            }
-
-            if (parameterExists(e, "shared-store"))
-            {
-               
HornetQServerLogger.LOGGER.incompatibleWithHAPolicy("shared-store");
-            }
-         }
-         else
-         {
-            if (parameterExists(e, "backup"))
-            {
-               
HornetQServerLogger.LOGGER.deprecatedConfigurationOption("backup");
-            }
-
-            if (parameterExists(e, "shared-store"))
+            if (haPolicyConfig instanceof ReplicaPolicyConfiguration)
             {
-               
HornetQServerLogger.LOGGER.deprecatedConfigurationOption("shared-store");
-            }
+               ReplicaPolicyConfiguration hapc = (ReplicaPolicyConfiguration) 
haPolicyConfig;
+               hapc.setMaxSavedReplicatedJournalsSize(getInteger(e, 
"max-saved-replicated-journals-size",
+                     hapc.getMaxSavedReplicatedJournalsSize(), 
Validators.MINUS_ONE_OR_GE_ZERO));
 
-            if (backup && sharedStore)
-            {
-               
config.setHAPolicy(HAPolicyTemplate.BACKUP_SHARED_STORE.getHaPolicy());
-            }
-            else if (backup && !sharedStore)
-            {
-               
config.setHAPolicy(HAPolicyTemplate.BACKUP_REPLICATED.getHaPolicy());
-            }
-            else if (!backup && sharedStore)
-            {
-               config.setHAPolicy(HAPolicyTemplate.SHARED_STORE.getHaPolicy());
             }
-            else if (!backup && !sharedStore)
+            else
             {
-               config.setHAPolicy(HAPolicyTemplate.REPLICATED.getHaPolicy());
+               
HornetQServerLogger.LOGGER.incompatibleWithHAPolicyChosen("max-saved-replicated-journals-size");
             }
          }
       }
 
+      //if we aren already set then set to default
+      if (config.getHAPolicyConfiguration() == null)
+      {
+         config.setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration());
+      }
+
+
       config.setResolveProtocols(getBoolean(e, "resolve-protocols", 
config.isResolveProtocols()));
 
       // Defaults to true when using FileConfiguration
@@ -1023,6 +1113,28 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          {
             addressSettings.setSendToDLAOnNoRoute(XMLUtil.parseBoolean(child));
          }
+         else if (SLOW_CONSUMER_THRESHOLD_NODE_NAME.equalsIgnoreCase(name))
+         {
+            long slowConsumerThreshold = XMLUtil.parseLong(child);
+            
Validators.MINUS_ONE_OR_GT_ZERO.validate(SLOW_CONSUMER_THRESHOLD_NODE_NAME, 
slowConsumerThreshold);
+
+            addressSettings.setSlowConsumerThreshold(slowConsumerThreshold);
+         }
+         else if (SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME.equalsIgnoreCase(name))
+         {
+            long slowConsumerCheckPeriod = XMLUtil.parseLong(child);
+            Validators.GT_ZERO.validate(SLOW_CONSUMER_CHECK_PERIOD_NODE_NAME, 
slowConsumerCheckPeriod);
+
+            
addressSettings.setSlowConsumerCheckPeriod(slowConsumerCheckPeriod);
+         }
+         else if (SLOW_CONSUMER_POLICY_NODE_NAME.equalsIgnoreCase(name))
+         {
+            String value = getTrimmedTextContent(child);
+            
Validators.SLOW_CONSUMER_POLICY_TYPE.validate(SLOW_CONSUMER_POLICY_NODE_NAME,
+                                                                 value);
+            SlowConsumerPolicy policy = Enum.valueOf(SlowConsumerPolicy.class, 
value);
+            addressSettings.setSlowConsumerPolicy(policy);
+         }
       }
       return setting;
    }
@@ -1054,7 +1166,11 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          }
       }
 
-      return new CoreQueueConfiguration(address, name, filterString, durable);
+      return new CoreQueueConfiguration()
+         .setAddress(address)
+         .setName(name)
+         .setFilterString(filterString)
+         .setDurable(durable);
    }
 
    private TransportConfiguration parseTransportConfiguration(final Element e, 
final Configuration mainConfig)
@@ -1097,74 +1213,180 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
       return new TransportConfiguration(clazz, params, name);
    }
 
+   private static final ArrayList<String> POLICY_LIST = new ArrayList<>();
+   static
+   {
+      POLICY_LIST.add("colocated");
+      POLICY_LIST.add("live-only");
+      POLICY_LIST.add("replicated");
+      POLICY_LIST.add("replica");
+      POLICY_LIST.add("shared-store-master");
+      POLICY_LIST.add("shared-store-slave");
+   }
+   private static final ArrayList<String> HA_LIST = new ArrayList<>();
+   static
+   {
+      HA_LIST.add("live-only");
+      HA_LIST.add("shared-store");
+      HA_LIST.add("replication");
+   }
    private void parseHAPolicyConfiguration(final Element e, final 
Configuration mainConfig)
    {
-      String policyTemplate = e.getAttribute("template");
-      HAPolicy policy;
-      if (policyTemplate.length() > 0)
-      {
-         policy = HAPolicyTemplate.valueOf(policyTemplate).getHaPolicy();
-      }
-      else
+      for (String haType : HA_LIST)
       {
-         policy = HAPolicyTemplate.NONE.getHaPolicy();
+         NodeList haNodeList = e.getElementsByTagName(haType);
+         if (haNodeList.getLength() > 0)
+         {
+            Element haNode = (Element) haNodeList.item(0);
+            if (haNode.getTagName().equals("replication"))
+            {
+               NodeList masterNodeList = e.getElementsByTagName("master");
+               if (masterNodeList.getLength() > 0)
+               {
+                  Element masterNode = (Element) masterNodeList.item(0);
+                  
mainConfig.setHAPolicyConfiguration(createReplicatedHaPolicy(masterNode));
+               }
+               NodeList slaveNodeList = e.getElementsByTagName("slave");
+               if (slaveNodeList.getLength() > 0)
+               {
+                  Element slaveNode = (Element) slaveNodeList.item(0);
+                  
mainConfig.setHAPolicyConfiguration(createReplicaHaPolicy(slaveNode));
+               }
+               NodeList colocatedNodeList = 
e.getElementsByTagName("colocated");
+               if (colocatedNodeList.getLength() > 0)
+               {
+                  Element colocatedNode = (Element) colocatedNodeList.item(0);
+                  
mainConfig.setHAPolicyConfiguration(createColocatedHaPolicy(colocatedNode, 
true));
+               }
+            }
+            else if (haNode.getTagName().equals("shared-store"))
+            {
+               NodeList masterNodeList = e.getElementsByTagName("master");
+               if (masterNodeList.getLength() > 0)
+               {
+                  Element masterNode = (Element) masterNodeList.item(0);
+                  
mainConfig.setHAPolicyConfiguration(createSharedStoreMasterHaPolicy(masterNode));
+               }
+               NodeList slaveNodeList = e.getElementsByTagName("slave");
+               if (slaveNodeList.getLength() > 0)
+               {
+                  Element slaveNode = (Element) slaveNodeList.item(0);
+                  
mainConfig.setHAPolicyConfiguration(createSharedStoreSlaveHaPolicy(slaveNode));
+               }
+               NodeList colocatedNodeList = 
e.getElementsByTagName("colocated");
+               if (colocatedNodeList.getLength() > 0)
+               {
+                  Element colocatedNode = (Element) colocatedNodeList.item(0);
+                  
mainConfig.setHAPolicyConfiguration(createColocatedHaPolicy(colocatedNode, 
false));
+               }
+            }
+            else if (haNode.getTagName().equals("live-only"))
+            {
+               NodeList noneNodeList = e.getElementsByTagName("live-only");
+               Element noneNode = (Element) noneNodeList.item(0);
+               
mainConfig.setHAPolicyConfiguration(createLiveOnlyHaPolicy(noneNode));
+            }
+         }
       }
-      mainConfig.setHAPolicy(policy);
+   }
 
-      String policyType = getString(e, "policy-type", 
policy.getPolicyType().toString(), Validators.NOT_NULL_OR_EMPTY);
+   private LiveOnlyPolicyConfiguration createLiveOnlyHaPolicy(Element 
policyNode)
+   {
+      LiveOnlyPolicyConfiguration configuration = new 
LiveOnlyPolicyConfiguration();
 
-      policy.setPolicyType(HAPolicy.POLICY_TYPE.valueOf(policyType));
+      
configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode));
 
-      boolean requestBackup = getBoolean(e, "request-backup", 
policy.isRequestBackup());
+      return configuration;
+   }
 
-      policy.setRequestBackup(requestBackup);
+   private ReplicatedPolicyConfiguration createReplicatedHaPolicy(Element 
policyNode)
+   {
+      ReplicatedPolicyConfiguration configuration = new 
ReplicatedPolicyConfiguration();
 
-      int backupRequestRetries = getInteger(e, "backup-request-retries", 
policy.getBackupRequestRetries(), Validators.MINUS_ONE_OR_GE_ZERO);
+      configuration.setCheckForLiveServer(getBoolean(policyNode, 
"check-for-live-server", configuration.isCheckForLiveServer()));
 
-      policy.setBackupRequestRetries(backupRequestRetries);
+      configuration.setGroupName(getString(policyNode, "group-name", 
configuration.getGroupName(), Validators.NO_CHECK));
 
-      long backupRequestRetryInterval = getLong(e, 
"backup-request-retry-interval", policy.getBackupRequestRetryInterval(), 
Validators.GT_ZERO);
+      configuration.setClusterName(getString(policyNode, "cluster-name", 
configuration.getClusterName(), Validators.NO_CHECK));
 
-      policy.setBackupRequestRetryInterval(backupRequestRetryInterval);
+      return configuration;
+   }
 
-      int maxBackups = getInteger(e, "max-backups", policy.getMaxBackups(), 
Validators.GE_ZERO);
+   private ReplicaPolicyConfiguration createReplicaHaPolicy(Element policyNode)
+   {
+      ReplicaPolicyConfiguration configuration = new 
ReplicaPolicyConfiguration();
 
-      policy.setMaxBackups(maxBackups);
+      configuration.setRestartBackup(getBoolean(policyNode, "restart-backup", 
configuration.isRestartBackup()));
 
-      int backupPortOffset = getInteger(e, "backup-port-offset", 
policy.getBackupPortOffset(), Validators.GT_ZERO);
+      configuration.setGroupName(getString(policyNode, "group-name", 
configuration.getGroupName(), Validators.NO_CHECK));
 
-      policy.setBackupPortOffset(backupPortOffset);
+      configuration.setAllowFailBack(getBoolean(policyNode, "allow-failback", 
configuration.isAllowFailBack()));
 
-      String backupStrategy = getString(e, "backup-strategy", 
policy.getBackupStrategy().toString(), Validators.NOT_NULL_OR_EMPTY);
+      configuration.setFailbackDelay(getLong(policyNode, "failback-delay", 
configuration.getFailbackDelay(), Validators.GT_ZERO));
 
-      policy.setBackupStrategy(BackupStrategy.valueOf(backupStrategy));
+      configuration.setClusterName(getString(policyNode, "cluster-name", 
configuration.getClusterName(), Validators.NO_CHECK));
 
-      String scaleDownDiscoveryGroup = getString(e, 
"scale-down-discovery-group", policy.getScaleDownDiscoveryGroup(), 
Validators.NO_CHECK);
+      configuration.setMaxSavedReplicatedJournalsSize(getInteger(policyNode, 
"max-saved-replicated-journals-size",
+            configuration.getMaxSavedReplicatedJournalsSize(), 
Validators.MINUS_ONE_OR_GE_ZERO));
 
-      policy.setScaleDownDiscoveryGroup(scaleDownDiscoveryGroup);
+      
configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode));
 
-      String scaleDownDiscoveryGroupName = getString(e, 
"scale-down-group-name", policy.getScaleDownGroupName(), Validators.NO_CHECK);
+      return configuration;
+   }
 
-      policy.setScaleDownGroupName(scaleDownDiscoveryGroupName);
+   private SharedStoreMasterPolicyConfiguration 
createSharedStoreMasterHaPolicy(Element policyNode)
+   {
+      SharedStoreMasterPolicyConfiguration configuration = new 
SharedStoreMasterPolicyConfiguration();
 
-      NodeList scaleDownConnectorNode = 
e.getElementsByTagName("scale-down-connectors");
+      configuration.setFailoverOnServerShutdown(getBoolean(policyNode, 
"failover-on-shutdown", configuration.isFailoverOnServerShutdown()));
 
-      if (scaleDownConnectorNode != null && scaleDownConnectorNode.getLength() 
> 0)
-      {
-         NodeList scaleDownConnectors = 
scaleDownConnectorNode.item(0).getChildNodes();
-         for (int i = 0; i < scaleDownConnectors.getLength(); i++)
-         {
-            Node child = scaleDownConnectors.item(i);
-            if (child.getNodeName().equals("connector-ref"))
-            {
-               String connectorName = getTrimmedTextContent(child);
+      configuration.setFailbackDelay(getLong(policyNode, "failback-delay", 
configuration.getFailbackDelay(), Validators.GT_ZERO));
 
-               policy.getScaleDownConnectors().add(connectorName);
-            }
-         }
-      }
+      return configuration;
+   }
+
+   private SharedStoreSlavePolicyConfiguration 
createSharedStoreSlaveHaPolicy(Element policyNode)
+   {
+      SharedStoreSlavePolicyConfiguration configuration = new 
SharedStoreSlavePolicyConfiguration();
+
+      configuration.setAllowFailBack(getBoolean(policyNode, "allow-failback", 
configuration.isAllowFailBack()));
 
-      NodeList remoteConnectorNode = 
e.getElementsByTagName("remote-connectors");
+      configuration.setFailoverOnServerShutdown(getBoolean(policyNode, 
"failover-on-shutdown", configuration.isFailoverOnServerShutdown()));
+
+      configuration.setFailbackDelay(getLong(policyNode, "failback-delay", 
configuration.getFailbackDelay(), Validators.GT_ZERO));
+
+      configuration.setRestartBackup(getBoolean(policyNode, "restart-backup", 
configuration.isRestartBackup()));
+
+      
configuration.setScaleDownConfiguration(parseScaleDownConfig(policyNode));
+
+      return configuration;
+   }
+
+   private ColocatedPolicyConfiguration createColocatedHaPolicy(Element 
policyNode, boolean replicated)
+   {
+      ColocatedPolicyConfiguration configuration = new 
ColocatedPolicyConfiguration();
+
+      boolean requestBackup = getBoolean(policyNode, "request-backup", 
configuration.isRequestBackup());
+
+      configuration.setRequestBackup(requestBackup);
+
+      int backupRequestRetries = getInteger(policyNode, 
"backup-request-retries", configuration.getBackupRequestRetries(), 
Validators.MINUS_ONE_OR_GE_ZERO);
+
+      configuration.setBackupRequestRetries(backupRequestRetries);
+
+      long backupRequestRetryInterval = getLong(policyNode, 
"backup-request-retry-interval", configuration.getBackupRequestRetryInterval(), 
Validators.GT_ZERO);
+
+      configuration.setBackupRequestRetryInterval(backupRequestRetryInterval);
+
+      int maxBackups = getInteger(policyNode, "max-backups", 
configuration.getMaxBackups(), Validators.GE_ZERO);
+
+      configuration.setMaxBackups(maxBackups);
+
+      int backupPortOffset = getInteger(policyNode, "backup-port-offset", 
configuration.getBackupPortOffset(), Validators.GT_ZERO);
+
+      configuration.setBackupPortOffset(backupPortOffset);
+
+      NodeList remoteConnectorNode = 
policyNode.getElementsByTagName("excludes");
 
       if (remoteConnectorNode != null && remoteConnectorNode.getLength() > 0)
       {
@@ -1175,31 +1397,65 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
             if (child.getNodeName().equals("connector-ref"))
             {
                String connectorName = getTrimmedTextContent(child);
-               policy.getRemoteConnectors().add(connectorName);
+               configuration.getExcludedConnectors().add(connectorName);
             }
          }
       }
 
-      policy.setScaleDown(getBoolean(e, "scale-down", policy.isScaleDown()));
+      NodeList masterNodeList = policyNode.getElementsByTagName("master");
+      if (masterNodeList.getLength() > 0)
+      {
+         Element masterNode = (Element) masterNodeList.item(0);
+         configuration.setLiveConfig(replicated ? 
createReplicatedHaPolicy(masterNode) : 
createSharedStoreMasterHaPolicy(masterNode));
+      }
+      NodeList slaveNodeList = policyNode.getElementsByTagName("slave");
+      if (slaveNodeList.getLength() > 0)
+      {
+         Element slaveNode = (Element) slaveNodeList.item(0);
+         configuration.setBackupConfig(replicated ? 
createReplicaHaPolicy(slaveNode) : createSharedStoreSlaveHaPolicy(slaveNode));
+      }
 
-      policy.setScaleDownGroupName(getString(e, "scale-down-group-name", 
policy.getScaleDownGroupName(), Validators.NO_CHECK));
+      return configuration;
+   }
+   private ScaleDownConfiguration parseScaleDownConfig(Element policyNode)
+   {
+      NodeList scaleDownNode = policyNode.getElementsByTagName("scale-down");
 
-      policy.setCheckForLiveServer(getBoolean(e, "check-for-live-server", 
policy.isCheckForLiveServer()));
+      if (scaleDownNode.getLength() > 0)
+      {
+         ScaleDownConfiguration scaleDownConfiguration = new 
ScaleDownConfiguration();
 
-      policy.setAllowAutoFailBack(getBoolean(e, "allow-failback", 
policy.isCheckForLiveServer()));
+         Element scaleDownElement = (Element) scaleDownNode.item(0);
 
-      policy.setFailoverOnServerShutdown(getBoolean(e, "failover-on-shutdown", 
policy.isFailoverOnServerShutdown()));
+         scaleDownConfiguration.setEnabled(getBoolean(scaleDownElement, 
"enabled", scaleDownConfiguration.isEnabled()));
 
-      policy.setBackupGroupName(getString(e, "backup-group-name", 
policy.getBackupGroupName(), Validators.NO_CHECK));
+         String scaleDownDiscoveryGroup = getString(scaleDownElement, 
"discovery-group", scaleDownConfiguration.getDiscoveryGroup(), 
Validators.NO_CHECK);
 
-      policy.setFailbackDelay(getLong(e, "failback-delay", 
policy.getFailbackDelay(), Validators.GT_ZERO));
+         scaleDownConfiguration.setDiscoveryGroup(scaleDownDiscoveryGroup);
 
-      policy.setReplicationClustername(getString(e, "replication-clustername", 
null, Validators.NO_CHECK));
+         String scaleDownDiscoveryGroupName = getString(scaleDownElement, 
"group-name", scaleDownConfiguration.getGroupName(), Validators.NO_CHECK);
 
-      policy.setScaleDownClustername(getString(e, "scale-down-clustername", 
null, Validators.NO_CHECK));
+         scaleDownConfiguration.setGroupName(scaleDownDiscoveryGroupName);
 
-      policy.setMaxSavedReplicatedJournalSize(getInteger(e, 
"max-saved-replicated-journals-size",
-                                                         
policy.getMaxSavedReplicatedJournalsSize(), Validators.MINUS_ONE_OR_GE_ZERO));
+         NodeList scaleDownConnectorNode = 
scaleDownElement.getElementsByTagName("connectors");
+
+         if (scaleDownConnectorNode != null && 
scaleDownConnectorNode.getLength() > 0)
+         {
+            NodeList scaleDownConnectors = 
scaleDownConnectorNode.item(0).getChildNodes();
+            for (int i = 0; i < scaleDownConnectors.getLength(); i++)
+            {
+               Node child = scaleDownConnectors.item(i);
+               if (child.getNodeName().equals("connector-ref"))
+               {
+                  String connectorName = getTrimmedTextContent(child);
+
+                  scaleDownConfiguration.getConnectors().add(connectorName);
+               }
+            }
+         }
+         return scaleDownConfiguration;
+      }
+      return null;
    }
 
    private void parseBroadcastGroupConfiguration(final Element e, final 
Configuration mainConfig)
@@ -1251,10 +1507,18 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
       }
       else
       {
-         endpointFactoryConfiguration = new 
UDPBroadcastGroupConfiguration(groupAddress, groupPort, localAddress, 
localBindPort);
+         endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration()
+            .setGroupAddress(groupAddress)
+            .setGroupPort(groupPort)
+            .setLocalBindAddress(localAddress)
+            .setLocalBindPort(localBindPort);
       }
 
-      BroadcastGroupConfiguration config = new 
BroadcastGroupConfiguration(name, broadcastPeriod, connectorNames, 
endpointFactoryConfiguration);
+      BroadcastGroupConfiguration config = new BroadcastGroupConfiguration()
+         .setName(name)
+         .setBroadcastPeriod(broadcastPeriod)
+         .setConnectorInfos(connectorNames)
+         .setEndpointFactoryConfiguration(endpointFactoryConfiguration);
 
       mainConfig.getBroadcastGroupConfigurations().add(config);
    }
@@ -1291,10 +1555,18 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
       }
       else
       {
-         endpointFactoryConfiguration = new 
UDPBroadcastGroupConfiguration(groupAddress, groupPort, localBindAddress, 
localBindPort);
+         endpointFactoryConfiguration = new UDPBroadcastGroupConfiguration()
+            .setGroupAddress(groupAddress)
+            .setGroupPort(groupPort)
+            .setLocalBindAddress(localBindAddress)
+            .setLocalBindPort(localBindPort);
       }
 
-      DiscoveryGroupConfiguration config = new 
DiscoveryGroupConfiguration(name, refreshTimeout, discoveryInitialWaitTimeout, 
endpointFactoryConfiguration);
+      DiscoveryGroupConfiguration config = new DiscoveryGroupConfiguration()
+         .setName(name)
+         .setRefreshTimeout(refreshTimeout)
+         .setDiscoveryInitialWaitTimeout(discoveryInitialWaitTimeout)
+         
.setBroadcastEndpointFactoryConfiguration(endpointFactoryConfiguration);
 
       if (mainConfig.getDiscoveryGroupConfigurations().containsKey(name))
       {
@@ -1357,7 +1629,7 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
 
 
       int confirmationWindowSize =
-         getInteger(e, "confirmation-window-size", 
FileConfiguration.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+         getInteger(e, "confirmation-window-size", 
HornetQDefaultConfiguration.getDefaultClusterConfirmationWindowSize(),
                     Validators.GT_ZERO);
 
       long clusterNotificationInterval = getLong(e, "notification-interval", 
HornetQDefaultConfiguration.getDefaultClusterNotificationInterval(), 
Validators.GT_ZERO);
@@ -1393,44 +1665,35 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          }
       }
 
-      ClusterConnectionConfiguration config;
+      ClusterConnectionConfiguration config = new 
ClusterConnectionConfiguration()
+         .setName(name)
+         .setAddress(address)
+         .setConnectorName(connectorName)
+         .setMinLargeMessageSize(minLargeMessageSize)
+         .setClientFailureCheckPeriod(clientFailureCheckPeriod)
+         .setConnectionTTL(connectionTTL)
+         .setRetryInterval(retryInterval)
+         .setRetryIntervalMultiplier(retryIntervalMultiplier)
+         .setMaxRetryInterval(maxRetryInterval)
+         .setInitialConnectAttempts(initialConnectAttempts)
+         .setReconnectAttempts(reconnectAttempts)
+         .setCallTimeout(callTimeout)
+         .setCallFailoverTimeout(callFailoverTimeout)
+         .setDuplicateDetection(duplicateDetection)
+         .setForwardWhenNoConsumers(forwardWhenNoConsumers)
+         .setMaxHops(maxHops)
+         .setConfirmationWindowSize(confirmationWindowSize)
+         .setAllowDirectConnectionsOnly(allowDirectConnectionsOnly)
+         .setClusterNotificationInterval(clusterNotificationInterval)
+         .setClusterNotificationAttempts(clusterNotificationAttempts);
 
       if (discoveryGroupName == null)
       {
-         config =
-            new ClusterConnectionConfiguration(name, address, connectorName,
-                                               minLargeMessageSize, 
clientFailureCheckPeriod, connectionTTL,
-                                               retryInterval, 
retryIntervalMultiplier, maxRetryInterval,
-                                               initialConnectAttempts, 
reconnectAttempts, callTimeout, callFailoverTimeout,
-                                               duplicateDetection, 
forwardWhenNoConsumers, maxHops,
-                                               confirmationWindowSize,
-                                               staticConnectorNames,
-                                               allowDirectConnectionsOnly,
-                                               clusterNotificationInterval,
-                                               clusterNotificationAttempts,
-                                               scaleDownConnector);
+         config.setStaticConnectors(staticConnectorNames);
       }
       else
       {
-         config =
-            new ClusterConnectionConfiguration(name, address, connectorName,
-                                               minLargeMessageSize, 
clientFailureCheckPeriod,
-                                               connectionTTL,
-                                               retryInterval,
-                                               retryIntervalMultiplier,
-                                               maxRetryInterval,
-                                               initialConnectAttempts,
-                                               reconnectAttempts,
-                                               callTimeout,
-                                               callFailoverTimeout,
-                                               duplicateDetection,
-                                               forwardWhenNoConsumers,
-                                               maxHops,
-                                               confirmationWindowSize,
-                                               discoveryGroupName,
-                                               clusterNotificationInterval,
-                                               clusterNotificationAttempts,
-                                               scaleDownConnector);
+         config.setDiscoveryGroupName(discoveryGroupName);
       }
 
       mainConfig.getClusterConfigurations().add(config);
@@ -1441,17 +1704,19 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
       String name = node.getAttribute("name");
       String type = getString(node, "type", null, 
Validators.NOT_NULL_OR_EMPTY);
       String address = getString(node, "address", null, 
Validators.NOT_NULL_OR_EMPTY);
-      Integer timeout = getInteger(node, "timeout", 
GroupingHandlerConfiguration.DEFAULT_TIMEOUT, Validators.GT_ZERO);
-      Long groupTimeout = getLong(node, "group-timeout", 
GroupingHandlerConfiguration.DEFAULT_GROUP_TIMEOUT, 
Validators.MINUS_ONE_OR_GT_ZERO);
-      Long reaperPeriod = getLong(node, "reaper-period", 
GroupingHandlerConfiguration.DEFAULT_REAPER_PERIOD, Validators.GT_ZERO);
-      mainConfiguration.setGroupingHandlerConfiguration(new 
GroupingHandlerConfiguration(new SimpleString(name),
-                                                                               
          type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())
-                                                                               
             ? GroupingHandlerConfiguration.TYPE.LOCAL
-                                                                               
             : GroupingHandlerConfiguration.TYPE.REMOTE,
-                                                                               
          new SimpleString(address),
-                                                                               
          timeout,
-                                                                               
          groupTimeout,
-                                                                               
          reaperPeriod));
+      Integer timeout = getInteger(node, "timeout", 
HornetQDefaultConfiguration.getDefaultGroupingHandlerTimeout(), 
Validators.GT_ZERO);
+      Long groupTimeout = getLong(node, "group-timeout", 
HornetQDefaultConfiguration.getDefaultGroupingHandlerGroupTimeout(), 
Validators.MINUS_ONE_OR_GT_ZERO);
+      Long reaperPeriod = getLong(node, "reaper-period", 
HornetQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod(), 
Validators.GT_ZERO);
+      mainConfiguration.setGroupingHandlerConfiguration(new 
GroupingHandlerConfiguration()
+                                                           .setName(new 
SimpleString(name))
+                                                           .setType(
+                                                              
type.equals(GroupingHandlerConfiguration.TYPE.LOCAL.getType())
+                                                                 ? 
GroupingHandlerConfiguration.TYPE.LOCAL
+                                                                 : 
GroupingHandlerConfiguration.TYPE.REMOTE)
+                                                           .setAddress(new 
SimpleString(address))
+                                                           .setTimeout(timeout)
+                                                           
.setGroupTimeout(groupTimeout)
+                                                           
.setReaperPeriod(reaperPeriod));
    }
 
    private void parseBridgeConfiguration(final Element brNode, final 
Configuration mainConfig) throws Exception
@@ -1466,7 +1731,7 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
 
       // Default bridge conf
       int confirmationWindowSize =
-         getInteger(brNode, "confirmation-window-size", 
FileConfiguration.DEFAULT_CONFIRMATION_WINDOW_SIZE,
+         getInteger(brNode, "confirmation-window-size", 
HornetQDefaultConfiguration.getDefaultBridgeConfirmationWindowSize(),
                     Validators.GT_ZERO);
 
       long retryInterval = getLong(brNode, "retry-interval", 
HornetQClient.DEFAULT_RETRY_INTERVAL, Validators.GT_ZERO);
@@ -1561,53 +1826,34 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          }
       }
 
-      BridgeConfiguration config;
+      BridgeConfiguration config = new BridgeConfiguration()
+         .setName(name)
+         .setQueueName(queueName)
+         .setForwardingAddress(forwardingAddress)
+         .setFilterString(filterString)
+         .setTransformerClassName(transformerClassName)
+         .setMinLargeMessageSize(minLargeMessageSize)
+         .setClientFailureCheckPeriod(clientFailureCheckPeriod)
+         .setConnectionTTL(connectionTTL)
+         .setRetryInterval(retryInterval)
+         .setMaxRetryInterval(maxRetryInterval)
+         .setRetryIntervalMultiplier(retryIntervalMultiplier)
+         .setInitialConnectAttempts(initialConnectAttempts)
+         .setReconnectAttempts(reconnectAttempts)
+         .setReconnectAttemptsOnSameNode(reconnectAttemptsSameNode)
+         .setUseDuplicateDetection(useDuplicateDetection)
+         .setConfirmationWindowSize(confirmationWindowSize)
+         .setHA(ha)
+         .setUser(user)
+         .setPassword(password);
 
       if (!staticConnectorNames.isEmpty())
       {
-         config = new BridgeConfiguration(name,
-                                          queueName,
-                                          forwardingAddress,
-                                          filterString,
-                                          transformerClassName,
-                                          minLargeMessageSize,
-                                          clientFailureCheckPeriod,
-                                          connectionTTL,
-                                          retryInterval,
-                                          maxRetryInterval,
-                                          retryIntervalMultiplier,
-                                          initialConnectAttempts,
-                                          reconnectAttempts,
-                                          reconnectAttemptsSameNode,
-                                          useDuplicateDetection,
-                                          confirmationWindowSize,
-                                          staticConnectorNames,
-                                          ha,
-                                          user,
-                                          password);
+         config.setStaticConnectors(staticConnectorNames);
       }
       else
       {
-         config = new BridgeConfiguration(name,
-                                          queueName,
-                                          forwardingAddress,
-                                          filterString,
-                                          transformerClassName,
-                                          minLargeMessageSize,
-                                          clientFailureCheckPeriod,
-                                          connectionTTL,
-                                          retryInterval,
-                                          maxRetryInterval,
-                                          retryIntervalMultiplier,
-                                          initialConnectAttempts,
-                                          reconnectAttempts,
-                                          reconnectAttemptsSameNode,
-                                          useDuplicateDetection,
-                                          confirmationWindowSize,
-                                          discoveryGroupName,
-                                          ha,
-                                          user,
-                                          password);
+         config.setDiscoveryGroupName(discoveryGroupName);
       }
 
       mainConfig.getBridgeConfigurations().add(config);
@@ -1655,13 +1901,14 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          }
       }
 
-      DivertConfiguration config = new DivertConfiguration(name,
-                                                           routingName,
-                                                           address,
-                                                           forwardingAddress,
-                                                           exclusive,
-                                                           filterString,
-                                                           
transformerClassName);
+      DivertConfiguration config = new DivertConfiguration()
+         .setName(name)
+         .setRoutingName(routingName)
+         .setAddress(address)
+         .setForwardingAddress(forwardingAddress)
+         .setExclusive(exclusive)
+         .setFilterString(filterString)
+         .setTransformerClassName(transformerClassName);
 
       mainConfig.getDivertConfigurations().add(config);
    }
@@ -1693,6 +1940,9 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil
          params.put(key, nValue.getTextContent());
       }
 
-      return new ConnectorServiceConfiguration(clazz, params, name);
+      return new ConnectorServiceConfiguration()
+         .setFactoryClassName(clazz)
+         .setParams(params)
+         .setName(name);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java
 
b/hornetq-server/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java
index ad9cc47..d606178 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/management/impl/HornetQServerControlImpl.java
@@ -24,25 +24,25 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.management.ListenerNotFoundException;
 import javax.management.MBeanNotificationInfo;
 import javax.management.MBeanOperationInfo;
+import javax.management.Notification;
 import javax.management.NotificationBroadcasterSupport;
 import javax.management.NotificationEmitter;
 import javax.management.NotificationFilter;
 import javax.management.NotificationListener;
 import javax.transaction.xa.Xid;
 
-import org.hornetq.api.config.HornetQDefaultConfiguration;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.management.AddressControl;
 import org.hornetq.api.core.management.BridgeControl;
+import org.hornetq.api.core.management.CoreNotificationType;
 import org.hornetq.api.core.management.DivertControl;
 import org.hornetq.api.core.management.HornetQServerControl;
-import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.api.core.management.QueueControl;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.Configuration;
@@ -52,18 +52,29 @@ import 
org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.config.PersistedAddressSetting;
 import org.hornetq.core.persistence.config.PersistedRoles;
+import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.security.CheckType;
 import org.hornetq.core.security.Role;
+import org.hornetq.core.server.Consumer;
 import org.hornetq.core.server.HornetQMessageBundle;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServerLogger;
 import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerConsumer;
 import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.cluster.ha.HAPolicy;
+import org.hornetq.core.server.cluster.ha.LiveOnlyPolicy;
+import org.hornetq.core.server.cluster.ha.ScaleDownPolicy;
+import org.hornetq.core.server.cluster.ha.SharedStoreSlavePolicy;
 import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.settings.impl.SlowConsumerPolicy;
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.TransactionDetail;
@@ -71,6 +82,7 @@ import 
org.hornetq.core.transaction.impl.CoreTransactionDetail;
 import org.hornetq.core.transaction.impl.XidImpl;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.utils.SecurityFormatter;
+import org.hornetq.utils.TypedProperties;
 import org.hornetq.utils.json.JSONArray;
 import org.hornetq.utils.json.JSONObject;
 
@@ -79,7 +91,8 @@ import org.hornetq.utils.json.JSONObject;
  *
  *
  */
-public class HornetQServerControlImpl extends AbstractControl implements 
HornetQServerControl, NotificationEmitter
+public class HornetQServerControlImpl extends AbstractControl implements 
HornetQServerControl, NotificationEmitter,
+                                                                         
org.hornetq.core.server.management.NotificationListener
 {
    // Constants -----------------------------------------------------
 
@@ -99,6 +112,7 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
 
    private final NotificationBroadcasterSupport broadcaster;
 
+   private final AtomicLong notifSeq = new AtomicLong(0);
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -120,6 +134,7 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
       server = messagingServer;
       this.messageCounterManager = messageCounterManager;
       this.broadcaster = broadcaster;
+      server.getManagementService().addNotificationListener(this);
    }
 
    // HornetQServerControlMBean implementation --------------------
@@ -159,7 +174,7 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
       clearIO();
       try
       {
-         return configuration.getHAPolicy().isBackup();
+         return server.getHAPolicy().isBackup();
       }
       finally
       {
@@ -174,7 +189,7 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
       clearIO();
       try
       {
-         return configuration.getHAPolicy().isSharedStore();
+         return server.getHAPolicy().isSharedStore();
       }
       finally
       {
@@ -284,7 +299,11 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
       clearIO();
       try
       {
-         
configuration.getHAPolicy().setFailoverOnServerShutdown(failoverOnServerShutdown);
+         HAPolicy haPolicy = server.getHAPolicy();
+         if (haPolicy instanceof SharedStoreSlavePolicy)
+         {
+            ((SharedStoreSlavePolicy) 
haPolicy).setFailoverOnServerShutdown(failoverOnServerShutdown);
+         }
       }
       finally
       {
@@ -300,7 +319,15 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
       clearIO();
       try
       {
-         return configuration.getHAPolicy().isFailoverOnServerShutdown();
+         HAPolicy haPolicy = server.getHAPolicy();
+         if (haPolicy instanceof SharedStoreSlavePolicy)
+         {
+            return ((SharedStoreSlavePolicy) 
haPolicy).isFailoverOnServerShutdown();
+         }
+         else
+         {
+            return false;
+         }
       }
       finally
       {
@@ -1238,8 +1265,8 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
             String remoteAddress = connection.getRemoteAddress();
             if (remoteAddress.contains(ipAddress))
             {
-               remotingService.removeConnection(connection.getID());
                
connection.fail(HornetQMessageBundle.BUNDLE.connectionsClosedByManagement(ipAddress));
+               remotingService.removeConnection(connection.getID());
                closed = true;
             }
          }
@@ -1253,6 +1280,94 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
 
    }
 
+   public synchronized boolean closeConsumerConnectionsForAddress(final String 
address)
+   {
+      boolean closed = false;
+      checkStarted();
+
+      clearIO();
+      try
+      {
+         for (Binding binding : 
postOffice.getMatchingBindings(SimpleString.toSimpleString(address)).getBindings())
+         {
+            if (binding instanceof LocalQueueBinding)
+            {
+               Queue queue = ((LocalQueueBinding) binding).getQueue();
+               for (Consumer consumer : queue.getConsumers())
+               {
+                  if (consumer instanceof ServerConsumer)
+                  {
+                     ServerConsumer serverConsumer = (ServerConsumer) consumer;
+                     RemotingConnection connection = null;
+
+                     for (RemotingConnection potentialConnection : 
remotingService.getConnections())
+                     {
+                        if 
(potentialConnection.getID().toString().equals(serverConsumer.getConnectionID()))
+                        {
+                           connection = potentialConnection;
+                        }
+                     }
+
+                     if (connection != null)
+                     {
+                        remotingService.removeConnection(connection.getID());
+                        
connection.fail(HornetQMessageBundle.BUNDLE.consumerConnectionsClosedByManagement(address));
+                        closed = true;
+                     }
+                  }
+               }
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         
HornetQServerLogger.LOGGER.failedToCloseConsumerConnectionsForAddress(address, 
e);
+      }
+      finally
+      {
+         blockOnIO();
+      }
+      return closed;
+   }
+
+   public synchronized boolean closeConnectionsForUser(final String userName)
+   {
+      boolean closed = false;
+      checkStarted();
+
+      clearIO();
+      try
+      {
+         for (ServerSession serverSession : server.getSessions())
+         {
+            if (serverSession.getUsername() != null && 
serverSession.getUsername().equals(userName))
+            {
+               RemotingConnection connection = null;
+
+               for (RemotingConnection potentialConnection : 
remotingService.getConnections())
+               {
+                  if 
(potentialConnection.getID().toString().equals(serverSession.getConnectionID().toString()))
+                  {
+                     connection = potentialConnection;
+                  }
+               }
+
+               if (connection != null)
+               {
+                  remotingService.removeConnection(connection.getID());
+                  
connection.fail(HornetQMessageBundle.BUNDLE.connectionsForUserClosedByManagement(userName));
+                  closed = true;
+               }
+            }
+         }
+      }
+      finally
+      {
+         blockOnIO();
+      }
+      return closed;
+   }
+
    public String[] listConnectionIDs()
    {
       checkStarted();
@@ -1511,6 +1626,11 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
             : addressSettings.getAddressFullMessagePolicy() == 
AddressFullMessagePolicy.DROP ? "DROP"
             : "FAIL";
       settings.put("addressFullMessagePolicy", policy);
+      settings.put("slowConsumerThreshold", 
addressSettings.getSlowConsumerThreshold());
+      settings.put("slowConsumerCheckPeriod", 
addressSettings.getSlowConsumerCheckPeriod());
+      policy = addressSettings.getSlowConsumerPolicy() == 
SlowConsumerPolicy.NOTIFY ? "NOTIFY"
+         : "KILL";
+      settings.put("slowConsumerPolicy", policy);
 
       JSONObject jsonObject = new JSONObject(settings);
       return jsonObject.toString();
@@ -1531,7 +1651,10 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
                                   final long maxRedeliveryDelay,
                                   final long redistributionDelay,
                                   final boolean sendToDLAOnNoRoute,
-                                  final String addressFullMessagePolicy) 
throws Exception
+                                  final String addressFullMessagePolicy,
+                                  final long slowConsumerThreshold,
+                                  final long slowConsumerCheckPeriod,
+                                  final String slowConsumerPolicy) throws 
Exception
    {
       checkStarted();
 
@@ -1580,6 +1703,20 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
       {
          
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
       }
+      addressSettings.setSlowConsumerThreshold(slowConsumerThreshold);
+      addressSettings.setSlowConsumerCheckPeriod(slowConsumerCheckPeriod);
+      if (slowConsumerPolicy == null)
+      {
+         addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
+      }
+      else if (slowConsumerPolicy.equalsIgnoreCase("NOTIFY"))
+      {
+         addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
+      }
+      else if (slowConsumerPolicy.equalsIgnoreCase("KILL"))
+      {
+         addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
+      }
       server.getAddressSettingsRepository().addMatch(address, addressSettings);
 
       storageManager.storeAddressSetting(new PersistedAddressSetting(new 
SimpleString(address), addressSettings));
@@ -1652,13 +1789,14 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
       clearIO();
       try
       {
-         DivertConfiguration config = new DivertConfiguration(name,
-               routingName,
-               address,
-               forwardingAddress,
-               exclusive,
-               filterString,
-               transformerClassName);
+         DivertConfiguration config = new DivertConfiguration()
+            .setName(name)
+            .setRoutingName(routingName)
+            .setAddress(address)
+            .setForwardingAddress(forwardingAddress)
+            .setExclusive(exclusive)
+            .setFilterString(filterString)
+            .setTransformerClassName(transformerClassName);
          server.deployDivert(config);
       }
       finally
@@ -1717,7 +1855,7 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
                             final boolean useDuplicateDetection,
                             final int confirmationWindowSize,
                             final long clientFailureCheckPeriod,
-                            final String connectorNames,
+                            final String staticConnectorsOrDiscoveryGroup,
                             boolean useDiscoveryGroup,
                             final boolean ha,
                             final String user,
@@ -1727,57 +1865,34 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
 
       clearIO();
 
-
       try
       {
-         BridgeConfiguration config = null;
+         BridgeConfiguration config = new BridgeConfiguration()
+               .setName(name)
+               .setQueueName(queueName)
+               .setForwardingAddress(forwardingAddress)
+               .setFilterString(filterString)
+               .setTransformerClassName(transformerClassName)
+               .setClientFailureCheckPeriod(clientFailureCheckPeriod)
+               .setRetryInterval(retryInterval)
+               .setRetryIntervalMultiplier(retryIntervalMultiplier)
+               .setInitialConnectAttempts(initialConnectAttempts)
+               .setReconnectAttempts(reconnectAttempts)
+               .setUseDuplicateDetection(useDuplicateDetection)
+               .setConfirmationWindowSize(confirmationWindowSize)
+               .setHA(ha)
+               .setUser(user)
+               .setPassword(password);
+
          if (useDiscoveryGroup)
          {
-            config = new BridgeConfiguration(name,
-                  queueName,
-                  forwardingAddress,
-                  filterString,
-                  transformerClassName,
-                  HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
-                  clientFailureCheckPeriod,
-                  HornetQClient.DEFAULT_CONNECTION_TTL,
-                  retryInterval,
-                  HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
-                  retryIntervalMultiplier,
-                  initialConnectAttempts,
-                  reconnectAttempts,
-                  
HornetQDefaultConfiguration.getDefaultBridgeConnectSameNode(),
-                  useDuplicateDetection,
-                  confirmationWindowSize,
-                  connectorNames,
-                  ha,
-                  user,
-                  password);
+            config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
          }
          else
          {
-            List<String> connectors = toList(connectorNames);
-            config = new BridgeConfiguration(name,
-                  queueName,
-                  forwardingAddress,
-                  filterString,
-                  transformerClassName,
-                  HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
-                  clientFailureCheckPeriod,
-                  HornetQClient.DEFAULT_CONNECTION_TTL,
-                  retryInterval,
-                  HornetQClient.DEFAULT_MAX_RETRY_INTERVAL,
-                  retryIntervalMultiplier,
-                  initialConnectAttempts,
-                  reconnectAttempts,
-                  
HornetQDefaultConfiguration.getDefaultBridgeConnectSameNode(),
-                  useDuplicateDetection,
-                  confirmationWindowSize,
-                  connectors,
-                  ha,
-                  user,
-                  password);
+            
config.setStaticConnectors(toList(staticConnectorsOrDiscoveryGroup));
          }
+
          server.deployBridge(config);
       }
       finally
@@ -1835,6 +1950,35 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
          blockOnIO();
       }
    }
+
+   @Override
+   public void scaleDown(String connector) throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+      HAPolicy haPolicy = server.getHAPolicy();
+      if (haPolicy instanceof LiveOnlyPolicy)
+      {
+         LiveOnlyPolicy liveOnlyPolicy = (LiveOnlyPolicy) haPolicy;
+
+         if (liveOnlyPolicy.getScaleDownPolicy() == null)
+         {
+            liveOnlyPolicy.setScaleDownPolicy(new ScaleDownPolicy());
+         }
+
+         liveOnlyPolicy.getScaleDownPolicy().setEnabled(true);
+
+         if (connector != null)
+         {
+            liveOnlyPolicy.getScaleDownPolicy().getConnectors().add(0, 
connector);
+         }
+
+         server.stop(true);
+      }
+
+   }
+
    // NotificationEmitter implementation ----------------------------
 
    public void removeNotificationListener(final NotificationListener listener,
@@ -1882,7 +2026,7 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
 
    public MBeanNotificationInfo[] getNotificationInfo()
    {
-      NotificationType[] values = NotificationType.values();
+      CoreNotificationType[] values = CoreNotificationType.values();
       String[] names = new String[values.length];
       for (int i = 0; i < values.length; i++)
       {
@@ -2028,5 +2172,16 @@ public class HornetQServerControlImpl extends 
AbstractControl implements HornetQ
       return list;
    }
 
+   @Override
+   public void onNotification(org.hornetq.core.server.management.Notification 
notification)
+   {
+      if (!(notification.getType() instanceof CoreNotificationType)) return;
+      CoreNotificationType type = (CoreNotificationType) 
notification.getType();
+      TypedProperties prop = notification.getProperties();
+
+      this.broadcaster.sendNotification(new Notification(type.toString(), this,
+            notifSeq.incrementAndGet(), notification.toString()));
+   }
+
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/management/impl/QueueControlImpl.java
 
b/hornetq-server/src/main/java/org/hornetq/core/management/impl/QueueControlImpl.java
index 7d3102c..8208ad4 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/management/impl/QueueControlImpl.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/management/impl/QueueControlImpl.java
@@ -254,6 +254,21 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl
       }
    }
 
+   public long getMessagesAcknowledged()
+   {
+      checkStarted();
+
+      clearIO();
+      try
+      {
+         return queue.getMessagesAcknowledged();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
    public long getID()
    {
       checkStarted();
@@ -926,6 +941,22 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl
       }
    }
 
+
+   public void flushExecutor()
+   {
+      checkStarted();
+
+      clearIO();
+      try
+      {
+         queue.flushExecutor();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+   }
+
    @Override
    public String listConsumersAsJSON() throws Exception
    {
@@ -987,6 +1018,22 @@ public class QueueControlImpl extends AbstractControl 
implements QueueControl
 
    }
 
+   public void resetMessagesAcknowledged() throws Exception
+   {
+      checkStarted();
+
+      clearIO();
+      try
+      {
+         queue.resetMessagesAcknowledged();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java
 
b/hornetq-server/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java
index d342bce..96a0851 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/messagecounter/MessageCounter.java
@@ -130,7 +130,7 @@ public class MessageCounter
    {
       public void run()
       {
-         long latestMessagesAdded = serverQueue.getInstantMessagesAdded();
+         long latestMessagesAdded = serverQueue.getMessagesAdded();
 
          long newMessagesAdded = latestMessagesAdded - lastMessagesAdded;
 
@@ -213,7 +213,7 @@ public class MessageCounter
     */
    public long getMessageCount()
    {
-      return serverQueue.getInstantMessageCount();
+      return serverQueue.getMessageCount();
    }
 
    /**
@@ -222,7 +222,7 @@ public class MessageCounter
     */
    public long getMessageCountDelta()
    {
-      long current = serverQueue.getInstantMessageCount();
+      long current = serverQueue.getMessageCount();
       int delta = (int)(current - depthLast);
 
       depthLast = current;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/PagingManager.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/paging/PagingManager.java 
b/hornetq-server/src/main/java/org/hornetq/core/paging/PagingManager.java
index def8e4d..7e6b0e1 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/paging/PagingManager.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/paging/PagingManager.java
@@ -21,16 +21,16 @@ import 
org.hornetq.core.settings.HierarchicalRepositoryChangeListener;
 /**
  * <PRE>
  *
- * +------------+      1  +-------------+       N +------------+       N 
+-------+       1 +----------------+
- * | {@link PostOffice} |-------&gt; |{@link PagingManager}|-------&gt; 
|{@link PagingStore} | ------&gt; | {@link Page}  | ------&gt; | {@link 
SequentialFile} |
- * +------------+         +-------------+         +------------+         
+-------+         +----------------+
- *                               |                       1 ^
- *                               |                         |
- *                               |                         |
- *                               |                         | 1
- *                               |        N +---------+   /
- *                               +--------&gt; | {@link Address} |
- *                                          +---------+
+ * +--------------+      1  +----------------+       N +--------------+       
N +--------+       1 +-------------------+
+ * | {@link org.hornetq.core.postoffice.PostOffice} |-------&gt; |{@link 
PagingManager}|-------&gt; |{@link PagingStore} | ------&gt; | {@link 
org.hornetq.core.paging.impl.Page}  | ------&gt; | {@link 
org.hornetq.core.journal.SequentialFile} |
+ * +--------------+         +----------------+         +--------------+        
 +--------+         +-------------------+
+ *                                                              |              
    1 ^
+ *                                                              |              
      |
+ *                                                              |              
      |
+ *                                                              |              
      | 1
+ *                                                              |            N 
+----------+
+ *                                                              
+------------&gt; | {@link org.hornetq.core.postoffice.Address} |
+ *                                                                             
+----------+
  * </PRE>
  * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
  * @author <a href="mailto:[email protected]";>Tim Fox</a>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/paging/PagingStore.java 
b/hornetq-server/src/main/java/org/hornetq/core/paging/PagingStore.java
index e286f40..d151d72 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/paging/PagingStore.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/paging/PagingStore.java
@@ -154,7 +154,7 @@ public interface PagingStore extends HornetQComponent
    /**
     * Sends the pages with given IDs to the {@link ReplicationManager}.
     * <p/>
-    * Sending is done here to avoid exposing the internal {@link 
SequentialFile}s.
+    * Sending is done here to avoid exposing the internal {@link 
org.hornetq.core.journal.SequentialFile}s.
     *
     * @param replicator
     * @param pageIds

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
 
b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
index bf85d1d..11a5ba4 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -91,9 +91,9 @@ public class PageCursorProviderImpl implements 
PageCursorProvider
 
    public synchronized PageSubscription createSubscription(long cursorID, 
Filter filter, boolean persistent)
    {
-      if (HornetQServerLogger.LOGGER.isDebugEnabled())
+      if (HornetQServerLogger.LOGGER.isTraceEnabled())
       {
-         HornetQServerLogger.LOGGER.debug(this.pagingStore.getAddress() + " 
creating subscription " + cursorID + " with filter " + filter, new 
Exception("trace"));
+         HornetQServerLogger.LOGGER.trace(this.pagingStore.getAddress() + " 
creating subscription " + cursorID + " with filter " + filter, new 
Exception("trace"));
       }
 
       if (activeCursors.containsKey(cursorID))

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
 
b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
index 5b2cedc..12fda59 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PagePositionImpl.java
@@ -41,14 +41,14 @@ public class PagePositionImpl implements PagePosition
     */
    public PagePositionImpl(long pageNr, int messageNr)
    {
-      super();
+      this();
       this.pageNr = pageNr;
       this.messageNr = messageNr;
    }
 
    public PagePositionImpl()
    {
-
+      super();
    }
 
    /**
@@ -151,4 +151,12 @@ public class PagePositionImpl implements PagePosition
       return "PagePositionImpl [pageNr=" + pageNr + ", messageNr=" + messageNr 
+ ", recordID=" + recordID + "]";
    }
 
+   /**
+    * I needed a finalize method defined here just as a way to get a hook on 
the PagingLeakTest through ByteMan
+    * There is a rule for finalizing it where I'm establishing a counter, and 
that rule won't work without this method defined.
+    * So, please don't remove it unless you had to remove that test for any 
weird reason.. it's here for a purpose!
+    */
+   protected void finalize()
+   {
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
 
b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index 4100d0d..ddd0880 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -346,7 +346,7 @@ public class PageSubscriptionCounterImpl implements 
PageSubscriptionCounter
 
       long newRecordID = -1;
 
-      long txCleanup = storage.generateUniqueID();
+      long txCleanup = storage.generateID();
 
       try
       {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
 
b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
index 24d8bb7..acd56f6 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -335,6 +335,7 @@ final class PageSubscriptionImpl implements PageSubscription
          }
 
          infoPG.acks.clear();
+         infoPG.removedReferences.clear();
       }
 
       tx.addOperation(new TransactionOperationAbstract()
@@ -672,7 +673,7 @@ final class PageSubscriptionImpl implements PageSubscription
     */
    public void destroy() throws Exception
    {
-      final long tx = store.generateUniqueID();
+      final long tx = store.generateID();
       try
       {
 
@@ -752,7 +753,7 @@ final class PageSubscriptionImpl implements PageSubscription
                HornetQServerLogger.LOGGER.pageNotFound(pos);
                if (txDeleteCursorOnReload == -1)
                {
-                  txDeleteCursorOnReload = store.generateUniqueID();
+                  txDeleteCursorOnReload = store.generateID();
                }
                
store.deleteCursorAcknowledgeTransactional(txDeleteCursorOnReload, 
pos.getRecordID());
             }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
 
b/hornetq-server/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
index 2c16ef0..df06d41 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
@@ -926,7 +926,7 @@ public class PagingStoreImpl implements PagingStore
 
             currentPage.write(pagedMessage);
 
-            if (tx == null && syncNonTransactional)
+            if (tx == null && syncNonTransactional && message.isDurable())
             {
                sync();
             }
@@ -1197,7 +1197,7 @@ public class PagingStoreImpl implements PagingStore
    }
 
    // To be used on isDropMessagesWhenFull
-   private boolean isFull()
+   public boolean isFull()
    {
       return maxSize > 0 && getAddressSize() > maxSize;
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/persistence/StorageManager.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/persistence/StorageManager.java 
b/hornetq-server/src/main/java/org/hornetq/core/persistence/StorageManager.java
index 7173f45..559a3c9 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/persistence/StorageManager.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/persistence/StorageManager.java
@@ -46,6 +46,7 @@ import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.server.impl.JournalLoader;
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.core.transaction.Transaction;
+import org.hornetq.utils.IDGenerator;
 
 /**
  * A StorageManager
@@ -53,8 +54,17 @@ import org.hornetq.core.transaction.Transaction;
  * @author <a href="mailto:[email protected]";>Tim Fox</a>
  * @author <a href="mailto:[email protected]";>Clebert Suconic</a>
  * @author <a href="mailto:[email protected]>Andy Taylor</a>
+ *
+ * Note about IDGEnerator
+ *
+ *  I've changed StorageManager to extend IDGenerator, because in some places
+ *  all we needed from the StorageManager was the idGeneration.
+ *  I couldn't just get the IDGenerator from the inner part because the 
NullPersistent has its own sequence.
+ *  So the best was to add the interface and adjust the callers for the method
+ *
+
  */
-public interface StorageManager extends HornetQComponent
+public interface StorageManager extends IDGenerator, HornetQComponent
 {
 
    /**
@@ -144,10 +154,6 @@ public interface StorageManager extends HornetQComponent
 
    void clearContext();
 
-   long generateUniqueID();
-
-   long getCurrentUniqueID();
-
    /**
     * Confirms that a large message was finished
     */
@@ -336,8 +342,7 @@ public interface StorageManager extends HornetQComponent
    Journal getMessageJournal();
 
    /**
-    * @see JournalStorageManager#startReplication(ReplicationManager, 
PagingManager, String,
-    * boolean)
+    * @see 
org.hornetq.core.persistence.impl.journal.JournalStorageManager#startReplication(org.hornetq.core.replication.ReplicationManager,
 org.hornetq.core.paging.PagingManager, String, boolean)
     */
    void startReplication(ReplicationManager replicationManager, PagingManager 
pagingManager, String nodeID,
                          boolean autoFailBack) throws Exception;
@@ -397,10 +402,10 @@ public interface StorageManager extends HornetQComponent
    void readUnLock();
 
    /**
-    * Closes the {@link IDGenerator} persisting the current record ID.
+    * Closes the {@link org.hornetq.utils.IDGenerator} persisting the current 
record ID.
     * <p/>
     * Effectively a "pre-stop" method. Necessary due to the "stop"-order at
-    * {@link HornetQServerImpl}
+    * {@link org.hornetq.core.server.impl.HornetQServerImpl}
     */
    void persistIdGenerator();
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/DescribeJournal.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/DescribeJournal.java
 
b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/DescribeJournal.java
index 77f01dc..d00e056 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/DescribeJournal.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/persistence/impl/journal/DescribeJournal.java
@@ -14,17 +14,14 @@ package org.hornetq.core.persistence.impl.journal;
 
 import javax.transaction.xa.Xid;
 import java.io.PrintStream;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -667,29 +664,8 @@ public final class DescribeJournal
          {
             buffer.append(";userMessageID=" + msg.getUserID().toString());
          }
-         buffer.append(";properties=[");
 
-         Set<SimpleString> properties = msg.getPropertyNames();
-
-         for (SimpleString prop : properties)
-         {
-            Object value = msg.getObjectProperty(prop);
-            if (value instanceof byte[])
-            {
-               buffer.append(prop + "=" + Arrays.toString((byte[])value) + 
",");
-
-            }
-            else
-            {
-               buffer.append(prop + "=" + value + ",");
-            }
-         }
-
-         buffer.append("#properties = " + properties.size());
-
-         buffer.append("]");
-
-         buffer.append(" - " + msg.toString());
+         buffer.append(";msg=" + msg.toString());
 
          return buffer.toString();
       }

Reply via email to