http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java
 
b/hornetq-server/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java
index 8b609e8..2034b61 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java
@@ -356,4 +356,16 @@ public interface HornetQMessageBundle
 
    @Message(id = 119105, value = "Server will not accept create session 
request since scale down has not occurred", format = 
Message.Format.MESSAGE_FORMAT)
    HornetQSessionCreationException sessionNotFailedOver();
+
+   @Message(id = 119106, value = "Invalid slow consumer policy type {0}", 
format = Message.Format.MESSAGE_FORMAT)
+   IllegalArgumentException invalidSlowConsumerPolicyType(String val);
+
+   @Message(id = 119107, value = "consumer connections for address {0} closed 
by management", format = Message.Format.MESSAGE_FORMAT)
+   HornetQInternalErrorException consumerConnectionsClosedByManagement(String 
address);
+
+   @Message(id = 119108, value = "connections for user {0} closed by 
management", format = Message.Format.MESSAGE_FORMAT)
+   HornetQInternalErrorException connectionsForUserClosedByManagement(String 
userName);
+
+   @Message(id = 119109, value = "unsupported HA Policy Configuration {0}", 
format = Message.Format.MESSAGE_FORMAT)
+   HornetQIllegalStateException unsupportedHAPolicyConfiguration(Object o);
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServer.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServer.java 
b/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServer.java
index 56bbc4a..7e9ee28 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServer.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServer.java
@@ -16,11 +16,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.DivertConfiguration;
@@ -28,14 +24,14 @@ import 
org.hornetq.core.management.impl.HornetQServerControlImpl;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.remoting.server.RemotingService;
-import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.security.Role;
-import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.security.SecurityStore;
 import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.cluster.ha.HAPolicy;
 import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.impl.Activation;
 import org.hornetq.core.server.impl.ConnectorsService;
 import org.hornetq.core.server.management.ManagementService;
 import org.hornetq.core.settings.HierarchicalRepository;
@@ -112,7 +108,10 @@ public interface HornetQServer extends HornetQComponent
                                boolean preAcknowledge,
                                boolean xa,
                                String defaultAddress,
-                               SessionCallback callback) throws Exception;
+                               SessionCallback callback,
+                               ServerSessionFactory sessionFactory) throws 
Exception;
+
+   SecurityStore getSecurityStore();
 
    void removeSession(String name) throws Exception;
 
@@ -153,17 +152,6 @@ public interface HornetQServer extends HornetQComponent
    boolean waitForActivation(long timeout, TimeUnit unit) throws 
InterruptedException;
 
    /**
-    * Wait for backup synchronization when using synchronization
-    * @param timeout
-    * @param unit
-    * @see CountDownLatch#await(long, TimeUnit)
-    * @return {@code true} if the server was already initialized or if it was 
initialized within the
-    *         timeout period, {@code false} otherwise.
-    * @throws InterruptedException
-    */
-   boolean waitForBackupSync(long timeout, TimeUnit unit) throws 
InterruptedException;
-
-   /**
     * Creates a shared queue. if non durable it will exist as long as there 
are consumers.
     *
     * Notice: the queue won't be deleted until the first consumer arrives.
@@ -211,8 +199,6 @@ public interface HornetQServer extends HornetQComponent
 
    GroupingHandler getGroupingHandler();
 
-   ReplicationEndpoint getReplicationEndpoint();
-
    ReplicationManager getReplicationManager();
 
    void deployDivert(DivertConfiguration config) throws Exception;
@@ -238,21 +224,6 @@ public interface HornetQServer extends HornetQComponent
 
    void stop(boolean failoverOnServerShutdown) throws Exception;
 
-   /**
-    * Starts replication.
-    * <p>
-    * This will spawn a new thread that will sync all persistent data with the 
new backup. This
-    * method may also trigger fail-back if the backup asks for it and the 
server configuration
-    * allows.
-    * @param rc
-    * @param pair
-    * @param clusterConnection
-    * @throws HornetQAlreadyReplicatingException if replication is already 
taking place
-    * @throws HornetQException
-    */
-   void startReplication(CoreRemotingConnection rc, ClusterConnection 
clusterConnection,
-                         Pair<TransportConfiguration, TransportConfiguration> 
pair, boolean failBackRequest) throws HornetQException;
-
    /*
    * add a ProtocolManagerFactory to be used. Note if @see 
Configuration#isResolveProtocols is tur then this factory will
    * replace any factories with the same protocol
@@ -269,4 +240,10 @@ public interface HornetQServer extends HornetQComponent
    void addScaledDownNode(SimpleString scaledDownNodeId);
 
    boolean hasScaledDown(SimpleString scaledDownNodeId);
+
+   Activation getActivation();
+
+   HAPolicy getHAPolicy();
+
+   void setHAPolicy(HAPolicy haPolicy);
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java 
b/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java
index cdeb3d1..3309946 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java
@@ -58,6 +58,7 @@ import org.hornetq.core.server.cluster.impl.BridgeImpl;
 import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
 import org.hornetq.core.server.impl.HornetQServerImpl;
 import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.server.management.Notification;
 import org.hornetq.utils.FutureLatch;
 import org.jboss.logging.BasicLogger;
 import org.jboss.logging.annotations.Cause;
@@ -279,6 +280,18 @@ public interface HornetQServerLogger extends BasicLogger
    @Message(id = 221047, value = "Backup Server has scaled down to live 
server", format = Message.Format.MESSAGE_FORMAT)
    void backupServerScaledDown();
 
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 221048, value = "Consumer {0}:{1} attached to queue ''{2}'' 
from {3} identified as ''slow.'' Expected consumption rate: {4} msgs/second; 
actual consumption rate: {5} msgs/second.", format = 
Message.Format.MESSAGE_FORMAT)
+   void slowConsumerDetected(String sessionID, long consumerID, String 
queueName, String remoteAddress, float slowConsumerThreshold, float 
consumerRate);
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 221049, value = "Activating Replica for node: {0}", format = 
Message.Format.MESSAGE_FORMAT)
+   void activatingReplica(SimpleString nodeID);
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 221050, value = "Activating Shared Store Slave", format = 
Message.Format.MESSAGE_FORMAT)
+   void activatingSharedStoreSlave();
+
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222000, value = "HornetQServer is being finalized and has not 
been stopped. Please remember to stop the server before letting it go out of 
scope",
             format = Message.Format.MESSAGE_FORMAT)
@@ -555,11 +568,6 @@ public interface HornetQServerLogger extends BasicLogger
    void errorProcessingIOCallback(Integer errorCode, String errorMessage);
 
    @LogMessage(level = Logger.Level.WARN)
-   @Message(id = 222064, value = "Client with version {0} and address {1} is 
not compatible with server version {2}. Please ensure all clients and servers 
are upgraded to the same version for them to interoperate properly",
-            format = Message.Format.MESSAGE_FORMAT)
-   void incompatibleVersion(Integer version, String remoteAddress, String 
fullVersion);
-
-   @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222065, value = "Client is not being consistent on the 
request versioning. It just sent a version id={0} while it informed {1} 
previously", format = Message.Format.MESSAGE_FORMAT)
    void incompatibleVersionAfterConnect(int version, int clientVersion);
 
@@ -1082,7 +1090,7 @@ public interface HornetQServerLogger extends BasicLogger
 
    @LogMessage(level = Logger.Level.WARN)
    @Message(id = 222187,
-          value = "Failed to activate replicated backup",
+          value = "Failed to activate replicata",
           format = Message.Format.MESSAGE_FORMAT)
    void activateReplicatedBackupFailed(@Cause Throwable e);
 
@@ -1092,6 +1100,12 @@ public interface HornetQServerLogger extends BasicLogger
           format = Message.Format.MESSAGE_FORMAT)
    void unableToFindTargetQueue(String targetNodeID);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222189,
+         value = "Failed to activate shared store slave",
+         format = Message.Format.MESSAGE_FORMAT)
+   void activateSharedStoreSlaveFailed(@Cause Throwable e);
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = 
Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);
@@ -1330,4 +1344,17 @@ public interface HornetQServerLogger extends BasicLogger
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224061, value = "Setting both <{0}> and <ha-policy> is 
invalid. Please use <ha-policy> exclusively as <{0}> is deprecated. Ignoring 
<{0}> value.", format = Message.Format.MESSAGE_FORMAT)
    void incompatibleWithHAPolicy(String parameter);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 224062, value = "Failed to send SLOW_CONSUMER notification: 
{0}", format = Message.Format.MESSAGE_FORMAT)
+   void failedToSendSlowConsumerNotification(Notification notification, @Cause 
Exception e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 224063, value = "Failed to close consumer connections for 
address {0}", format = Message.Format.MESSAGE_FORMAT)
+   void failedToCloseConsumerConnectionsForAddress(String address, @Cause 
Exception e);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 224064, value = "Setting <{0}> is invalid with this HA Policy 
Configuration. Please use <ha-policy> exclusively or remove. Ignoring <{0}> 
value.", format = Message.Format.MESSAGE_FORMAT)
+   void incompatibleWithHAPolicyChosen(String parameter);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/NodeManager.java 
b/hornetq-server/src/main/java/org/hornetq/core/server/NodeManager.java
index 2fe948b..049a3a6 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/NodeManager.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/NodeManager.java
@@ -39,7 +39,6 @@ public abstract class NodeManager implements HornetQComponent
    private final Object nodeIDGuard = new Object();
    private SimpleString nodeID;
    private UUID uuid;
-   private String nodeGroupName;
    private boolean isStarted = false;
 
    protected FileChannel channel;
@@ -122,16 +121,6 @@ public abstract class NodeManager implements 
HornetQComponent
       }
    }
 
-   public void setNodeGroupName(String nodeGroupName)
-   {
-      this.nodeGroupName = nodeGroupName;
-   }
-
-   public String getNodeGroupName()
-   {
-      return nodeGroupName;
-   }
-
    public abstract boolean isAwaitingFailback() throws Exception;
 
    public abstract boolean isBackupLive() throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/Queue.java 
b/hornetq-server/src/main/java/org/hornetq/core/server/Queue.java
index 7526b44..afa03ff 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/Queue.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/Queue.java
@@ -99,28 +99,8 @@ public interface Queue extends Bindable
 
    void destroyPaging() throws Exception;
 
-   /**
-    * It will wait for up to 10 seconds for a flush on the executors and 
return the number of messages added.
-    * if the executor is busy for any reason (say an unbehaved consumer) we 
will just return the current value.
-    *
-    * @return
-    */
    long getMessageCount();
 
-   /**
-    * This method will return the messages added after waiting some time on 
the flush executors.
-    * If the executor couldn't be flushed within the timeout we will just 
return the current value without any warn
-    *
-    * @param timeout Time to wait for current executors to finish in 
milliseconds.
-    * @return
-    */
-   long getMessageCount(long timeout);
-
-   /**
-    * Return the current message count without waiting for scheduled executors 
to finish
-    */
-   long getInstantMessageCount();
-
    int getDeliveringCount();
 
    void referenceHandled();
@@ -137,24 +117,9 @@ public interface Queue extends Bindable
     */
    Map<String, List<MessageReference>> getDeliveringMessages();
 
-   /**
-    * It will wait for up to 10 seconds for a flush on the executors and 
return the number of messages added.
-    * if the executor is busy for any reason (say an unbehaved consumer) we 
will just return the current value.
-    *
-    * @return
-    */
    long getMessagesAdded();
 
-   /**
-    * This method will return the messages added after waiting some time on 
the flush executors.
-    * If the executor couldn't be flushed within the timeout we will just 
return the current value without any warn
-    *
-    * @param timeout Time to wait for current executors to finish in 
milliseconds.
-    * @return
-    */
-   long getMessagesAdded(long timeout);
-
-   long getInstantMessagesAdded();
+   long getMessagesAcknowledged();
 
    MessageReference removeReferenceWithID(long id) throws Exception;
 
@@ -255,7 +220,13 @@ public interface Queue extends Bindable
 
    void resetMessagesAdded();
 
+   void resetMessagesAcknowledged();
+
    void incrementMesssagesAdded();
 
    List<MessageReference> cancelScheduledMessages();
+
+   void postAcknowledge(MessageReference ref);
+
+   float getRate();
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/ServerConsumer.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/ServerConsumer.java 
b/hornetq-server/src/main/java/org/hornetq/core/server/ServerConsumer.java
index 279cff7..6d94d01 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/ServerConsumer.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/ServerConsumer.java
@@ -25,25 +25,45 @@ import org.hornetq.core.transaction.Transaction;
  */
 public interface ServerConsumer extends Consumer
 {
+   /**
+    * @see #getProtocolContext()
+    * @param protocolContext
+    */
+   void setProtocolContext(Object protocolContext);
+
+   /**
+    * An object set by the Protocol implementation.
+    * it could be anything pre-determined by the implementation
+    */
+   Object getProtocolContext();
+
    long getID();
 
    Object getConnectionID();
 
    void close(boolean failed) throws Exception;
 
+   /**
+    * This method is just to remove itself from Queues.
+    * If for any reason during a close an exception occurred, the exception 
treatment
+    * will call removeItself what should take the consumer out of any queues.
+    * @throws Exception
+    */
+   void removeItself() throws Exception;
+
    List<MessageReference> cancelRefs(boolean failed, boolean 
lastConsumedAsDelivered, Transaction tx) throws Exception;
 
    void setStarted(boolean started);
 
-   void receiveCredits(int credits) throws Exception;
+   void receiveCredits(int credits);
 
    Queue getQueue();
 
    MessageReference removeReferenceByID(long messageID) throws Exception;
 
-   void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID) 
throws Exception;
+   void acknowledge(Transaction tx, long messageID) throws Exception;
 
-   void individualAcknowledge(boolean autoCommitAcks, Transaction tx, long 
messageID) throws Exception;
+   void individualAcknowledge(Transaction tx, long messageID) throws Exception;
 
    void individualCancel(final long messageID, boolean failed) throws 
Exception;
 
@@ -56,6 +76,8 @@ public interface ServerConsumer extends Consumer
    long getCreationTime();
 
    String getSessionID();
+
+   void promptDelivery();
 }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/ServerMessage.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/ServerMessage.java 
b/hornetq-server/src/main/java/org/hornetq/core/server/ServerMessage.java
index cdbc028..6d84a3d 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/ServerMessage.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/ServerMessage.java
@@ -27,7 +27,7 @@ import org.hornetq.core.paging.PagingStore;
  */
 public interface ServerMessage extends MessageInternal, EncodingSupport
 {
-   void setMessageID(long id);
+   ServerMessage setMessageID(long id);
 
    MessageReference createReference(Queue queue);
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/ServerSession.java 
b/hornetq-server/src/main/java/org/hornetq/core/server/ServerSession.java
index 2fc4cb3..e8d523e 100644
--- a/hornetq-server/src/main/java/org/hornetq/core/server/ServerSession.java
+++ b/hornetq-server/src/main/java/org/hornetq/core/server/ServerSession.java
@@ -43,7 +43,7 @@ public interface ServerSession
 
    RemotingConnection getRemotingConnection();
 
-   void removeConsumer(long consumerID) throws Exception;
+   boolean removeConsumer(long consumerID) throws Exception;
 
    void acknowledge(long consumerID, long messageID) throws Exception;
 
@@ -95,7 +95,7 @@ public interface ServerSession
 
    void deleteQueue(SimpleString name) throws Exception;
 
-   void createConsumer(long consumerID, SimpleString queueName, SimpleString 
filterString, boolean browseOnly) throws Exception;
+   ServerConsumer createConsumer(long consumerID, SimpleString queueName, 
SimpleString filterString, boolean browseOnly) throws Exception;
 
    QueueQueryResult executeQueueQuery(SimpleString name) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/ServerSessionFactory.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/ServerSessionFactory.java
 
b/hornetq-server/src/main/java/org/hornetq/core/server/ServerSessionFactory.java
new file mode 100644
index 0000000..8a752e8
--- /dev/null
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/server/ServerSessionFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.security.SecurityStore;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+public interface ServerSessionFactory
+{
+
+   ServerSessionImpl createCoreSession(String name, String username, String 
password,
+         int minLargeMessageSize, boolean autoCommitSends,
+         boolean autoCommitAcks, boolean preAcknowledge,
+         boolean persistDeliveryCountBeforeDelivery, boolean xa,
+         RemotingConnection connection, StorageManager storageManager,
+         PostOffice postOffice, ResourceManager resourceManager,
+         SecurityStore securityStore, ManagementService managementService,
+         HornetQServerImpl hornetQServerImpl, SimpleString managementAddress,
+         SimpleString simpleString, SessionCallback callback,
+         OperationContext context) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/BackupManager.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/BackupManager.java
 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/BackupManager.java
index e313fab..37cdea5 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/BackupManager.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/BackupManager.java
@@ -29,7 +29,6 @@ import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.client.impl.Topology;
 import org.hornetq.core.config.ClusterConnectionConfiguration;
 import org.hornetq.core.config.Configuration;
-import org.hornetq.core.protocol.ServerPacketDecoder;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServerLogger;
@@ -80,7 +79,7 @@ public class BackupManager implements HornetQComponent
       for (BackupConnector conn : backupConnectors)
       {
          conn.start();
-         if (configuration.getHAPolicy().isBackup() && 
configuration.getHAPolicy().isSharedStore())
+         if (server.getHAPolicy().isBackup() && 
server.getHAPolicy().isSharedStore())
          {
             conn.informTopology();
             conn.announceBackup();
@@ -219,7 +218,7 @@ public class BackupManager implements HornetQComponent
             backupServerLocator.setIdentity("backupLocatorFor='" + server + 
"'");
             backupServerLocator.setReconnectAttempts(-1);
             backupServerLocator.setInitialConnectAttempts(-1);
-            backupServerLocator.setPacketDecoder(ServerPacketDecoder.INSTANCE);
+            
backupServerLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance());
          }
       }
 
@@ -260,8 +259,8 @@ public class BackupManager implements HornetQComponent
                      clusterControl.authorize();
                      
clusterControl.sendNodeAnnounce(System.currentTimeMillis(),
                                                       
nodeManager.getNodeId().toString(),
-                                                      
configuration.getHAPolicy().getBackupGroupName(),
-                                                      
configuration.getHAPolicy().getScaleDownClustername(),
+                                                      
server.getHAPolicy().getBackupGroupName(),
+                                                      
server.getHAPolicy().getScaleDownClustername(),
                                                       true,
                                                       connector,
                                                       null);
@@ -372,7 +371,7 @@ public class BackupManager implements HornetQComponent
             }
             ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, 
tcConfigs);
             locator.setClusterConnection(true);
-            locator.setPacketDecoder(ServerPacketDecoder.INSTANCE);
+            
locator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance());
             return locator;
          }
          return null;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterControl.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterControl.java
 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterControl.java
index d3907f9..0ace46c 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterControl.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterControl.java
@@ -92,10 +92,10 @@ public class ClusterControl implements AutoCloseable
     *                           server.
     * @throws org.hornetq.api.core.HornetQException
     */
-   public void announceReplicatingBackupToLive(final boolean 
attemptingFailBack) throws HornetQException
+   public void announceReplicatingBackupToLive(final boolean 
attemptingFailBack, String replicationClusterName) throws HornetQException
    {
 
-      ClusterConnectionConfiguration config = 
ConfigurationUtils.getReplicationClusterConfiguration(server.getConfiguration());
+      ClusterConnectionConfiguration config = 
ConfigurationUtils.getReplicationClusterConfiguration(server.getConfiguration(),
 replicationClusterName);
       if (config == null)
       {
          HornetQServerLogger.LOGGER.announceBackupNoClusterConnections();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterController.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterController.java
 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterController.java
index 6d7abca..8c028fe 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterController.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterController.java
@@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.HornetQAlreadyReplicatingException;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.Pair;
@@ -34,16 +33,11 @@ import 
org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ServerLocatorImpl;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.client.impl.Topology;
-import org.hornetq.core.protocol.ServerPacketDecoder;
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.ChannelHandler;
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
-import 
org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
-import 
org.hornetq.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.BackupRequestMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.BackupResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterConnectMessage;
 import 
org.hornetq.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
@@ -53,10 +47,10 @@ import 
org.hornetq.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServerLogger;
-import org.hornetq.core.server.cluster.ha.HAPolicy;
 import org.hornetq.core.server.cluster.qourum.QuorumManager;
 import org.hornetq.core.server.cluster.qourum.QuorumVoteHandler;
 import org.hornetq.core.server.cluster.qourum.Vote;
+import org.hornetq.core.server.impl.Activation;
 import org.hornetq.spi.core.remoting.Acceptor;
 
 /**
@@ -83,6 +77,7 @@ public class ClusterController implements HornetQComponent
    private CountDownLatch replicationClusterConnectedLatch;
 
    private boolean started;
+   private SimpleString replicatedClusterName;
 
    public ClusterController(HornetQServer server, ScheduledExecutorService 
scheduledExecutor)
    {
@@ -99,9 +94,9 @@ public class ClusterController implements HornetQComponent
       //set the default locator that will be used to connecting to the default 
cluster.
       defaultLocator = locators.get(defaultClusterConnectionName);
       //create a locator for replication, either the default or the specified 
if not set
-      if (server.getConfiguration().getHAPolicy().getReplicationClustername() 
!= null && 
!server.getConfiguration().getHAPolicy().getReplicationClustername().equals(defaultClusterConnectionName.toString()))
+      if (replicatedClusterName != null && 
!replicatedClusterName.equals(defaultClusterConnectionName))
       {
-         replicationLocator = 
locators.get(server.getConfiguration().getHAPolicy().getReplicationClustername());
+         replicationLocator = locators.get(replicatedClusterName);
          if (replicationLocator == null)
          {
             
HornetQServerLogger.LOGGER.noClusterConnectionForReplicationCluster();
@@ -172,7 +167,7 @@ public class ClusterController implements HornetQComponent
       serverLocator.setReconnectAttempts(-1);
       serverLocator.setInitialConnectAttempts(-1);
       //this is used for replication so need to use the server packet decoder
-      serverLocator.setPacketDecoder(ServerPacketDecoder.INSTANCE);
+      
serverLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance());
       locators.put(name, serverLocator);
    }
 
@@ -189,7 +184,7 @@ public class ClusterController implements HornetQComponent
       serverLocator.setReconnectAttempts(-1);
       serverLocator.setInitialConnectAttempts(-1);
       //this is used for replication so need to use the server packet decoder
-      serverLocator.setPacketDecoder(ServerPacketDecoder.INSTANCE);
+      
serverLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance());
       locators.put(name, serverLocator);
    }
 
@@ -253,7 +248,7 @@ public class ClusterController implements HornetQComponent
     */
    public ClusterControl connectToNodeInCluster(ClientSessionFactoryInternal 
sf)
    {
-      
((ServerLocatorInternal)sf.getServerLocator()).setPacketDecoder(ServerPacketDecoder.INSTANCE);
+      
sf.getServerLocator().setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance());
       return new ClusterControl(sf, server);
    }
 
@@ -280,14 +275,14 @@ public class ClusterController implements HornetQComponent
 
    /**
     * used to set a channel handler on the connection that can be used by the 
cluster control
-    *
-    * @param channel the channel to set the handler
+    *  @param channel the channel to set the handler
     * @param acceptorUsed the acceptor used for connection
     * @param remotingConnection the connection itself
+    * @param activation
     */
-   public void addClusterChannelHandler(Channel channel, Acceptor 
acceptorUsed, CoreRemotingConnection remotingConnection)
+   public void addClusterChannelHandler(Channel channel, Acceptor 
acceptorUsed, CoreRemotingConnection remotingConnection, Activation activation)
    {
-      channel.setHandler(new ClusterControllerChannelHandler(channel, 
acceptorUsed, remotingConnection));
+      channel.setHandler(new ClusterControllerChannelHandler(channel, 
acceptorUsed, remotingConnection, 
activation.getActivationChannelHandler(channel, acceptorUsed)));
    }
 
    public int getDefaultClusterSize()
@@ -310,6 +305,11 @@ public class ClusterController implements HornetQComponent
       return server.getIdentity();
    }
 
+   public void setReplicatedClusterName(String replicatedClusterName)
+   {
+      this.replicatedClusterName = new SimpleString(replicatedClusterName);
+   }
+
    /**
     * a handler for handling packets sent between the cluster.
     */
@@ -318,13 +318,15 @@ public class ClusterController implements HornetQComponent
       private final Channel clusterChannel;
       private final Acceptor acceptorUsed;
       private final CoreRemotingConnection remotingConnection;
+      private final ChannelHandler channelHandler;
       boolean authorized = false;
 
-      public ClusterControllerChannelHandler(Channel clusterChannel, Acceptor 
acceptorUsed, CoreRemotingConnection remotingConnection)
+      public ClusterControllerChannelHandler(Channel clusterChannel, Acceptor 
acceptorUsed, CoreRemotingConnection remotingConnection, ChannelHandler 
channelHandler)
       {
          this.clusterChannel = clusterChannel;
          this.acceptorUsed = acceptorUsed;
          this.remotingConnection = remotingConnection;
+         this.channelHandler = channelHandler;
       }
 
       @Override
@@ -336,6 +338,12 @@ public class ClusterController implements HornetQComponent
             {
                ClusterConnection clusterConnection = 
acceptorUsed.getClusterConnection();
 
+               //if this acceptor isnt associated with a cluster connection 
use the default
+               if (clusterConnection == null)
+               {
+                  clusterConnection = 
server.getClusterManager().getDefaultConnection(null);
+               }
+
                ClusterConnectMessage msg = (ClusterConnectMessage) packet;
 
                if (server.getConfiguration().isSecurityEnabled() && 
!clusterConnection.verify(msg.getClusterUser(), msg.getClusterPassword()))
@@ -387,24 +395,6 @@ public class ClusterController implements HornetQComponent
                   HornetQServerLogger.LOGGER.debug("there is no acceptor used 
configured at the CoreProtocolManager " + this);
                }
             }
-            else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
-            {
-               BackupRegistrationMessage msg = 
(BackupRegistrationMessage)packet;
-               ClusterConnection clusterConnection = 
acceptorUsed.getClusterConnection();
-               try
-               {
-                  server.startReplication(remotingConnection, 
clusterConnection, getPair(msg.getConnector(), true),
-                        msg.isFailBackRequest());
-               }
-               catch (HornetQAlreadyReplicatingException are)
-               {
-                  clusterChannel.send(new 
BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.ALREADY_REPLICATING));
-               }
-               catch (HornetQException e)
-               {
-                  clusterChannel.send(new 
BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION));
-               }
-            }
             else if (packet.getType() == PacketImpl.QUORUM_VOTE)
             {
                QuorumVoteMessage quorumVoteMessage = (QuorumVoteMessage) 
packet;
@@ -413,31 +403,6 @@ public class ClusterController implements HornetQComponent
                Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), 
quorumVoteMessage.getVote());
                clusterChannel.send(new 
QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
             }
-            else if (packet.getType() == PacketImpl.BACKUP_REQUEST)
-            {
-               BackupRequestMessage backupRequestMessage = 
(BackupRequestMessage) packet;
-               boolean started = false;
-               try
-               {
-                  if (backupRequestMessage.getBackupType() == 
HAPolicy.POLICY_TYPE.COLOCATED_REPLICATED)
-                  {
-                     started = 
server.getClusterManager().getHAManager().activateReplicatedBackup(backupRequestMessage.getBackupSize(),
 backupRequestMessage.getNodeID());
-                  }
-                  else
-                  {
-                     started = 
server.getClusterManager().getHAManager().activateSharedStoreBackup(backupRequestMessage.getBackupSize(),
-                           backupRequestMessage.getJournalDirectory(),
-                           backupRequestMessage.getBindingsDirectory(),
-                           backupRequestMessage.getLargeMessagesDirectory(),
-                           backupRequestMessage.getPagingDirectory());
-                  }
-               }
-               catch (Exception e)
-               {
-                  //todo log a warning and send false
-               }
-               clusterChannel.send(new BackupResponseMessage(started));
-            }
             else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT)
             {
                ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) 
packet;
@@ -447,17 +412,13 @@ public class ClusterController implements HornetQComponent
                   server.addScaledDownNode(message.getScaledDownNodeId());
                }
             }
+            else if (channelHandler != null)
+            {
+               channelHandler.handlePacket(packet);
+            }
          }
       }
-      private Pair<TransportConfiguration, TransportConfiguration> 
getPair(TransportConfiguration conn,
-                                                                           
boolean isBackup)
-      {
-         if (isBackup)
-         {
-            return new Pair<>(null, conn);
-         }
-         return new Pair<>(conn, null);
-      }
+
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterManager.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterManager.java
 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterManager.java
index bd9ba38..62e8f22 100644
--- 
a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterManager.java
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterManager.java
@@ -55,8 +55,8 @@ import org.hornetq.core.server.cluster.impl.BridgeImpl;
 import org.hornetq.core.server.cluster.impl.BroadcastGroupImpl;
 import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
 import org.hornetq.core.server.cluster.qourum.QuorumManager;
+import org.hornetq.core.server.impl.Activation;
 import org.hornetq.core.server.management.ManagementService;
-import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.utils.ConcurrentHashSet;
@@ -66,8 +66,8 @@ import org.hornetq.utils.FutureLatch;
 /**
  * A ClusterManager manages {@link ClusterConnection}s, {@link 
BroadcastGroup}s and {@link Bridge}s.
  * <p/>
- * Note that {@link ClusterConnectionBridge}s extend Bridges but are 
controlled over through
- * {@link ClusterConnectionImpl}. As a node is discovered a new {@link 
ClusterConnectionBridge} is
+ * Note that {@link 
org.hornetq.core.server.cluster.impl.ClusterConnectionBridge}s extend Bridges 
but are controlled over through
+ * {@link ClusterConnectionImpl}. As a node is discovered a new {@link 
org.hornetq.core.server.cluster.impl.ClusterConnectionBridge} is
  * deployed.
  *
  * @author <a href="mailto:[email protected]";>Tim Fox</a>
@@ -112,9 +112,9 @@ public final class ClusterManager implements 
HornetQComponent
       return haManager;
    }
 
-   public void addClusterChannelHandler(Channel channel, Acceptor 
acceptorUsed, CoreRemotingConnection remotingConnection)
+   public void addClusterChannelHandler(Channel channel, Acceptor 
acceptorUsed, CoreRemotingConnection remotingConnection, Activation activation)
    {
-      clusterController.addClusterChannelHandler(channel, acceptorUsed, 
remotingConnection);
+      clusterController.addClusterChannelHandler(channel, acceptorUsed, 
remotingConnection, activation);
    }
 
    enum State
@@ -172,7 +172,7 @@ public final class ClusterManager implements 
HornetQComponent
 
       clusterController = new ClusterController(server, scheduledExecutor);
 
-      haManager = new HAManager(server.getConfiguration().getHAPolicy(), 
server.getSecurityManager(), server, 
server.getConfiguration().getBackupServerConfigurations());
+      haManager = server.getActivation().getHAManager();
    }
 
    public String describe()
@@ -235,12 +235,12 @@ public final class ClusterManager implements 
HornetQComponent
 
    public String getBackupGroupName()
    {
-      return configuration.getHAPolicy().getBackupGroupName();
+      return server.getHAPolicy().getBackupGroupName();
    }
 
    public String getScaleDownGroupName()
    {
-      return haManager.getHAPolicy().getScaleDownGroupName();
+      return server.getHAPolicy().getScaleDownGroupName();
    }
 
    public synchronized void deploy() throws Exception
@@ -514,30 +514,6 @@ public final class ClusterManager implements 
HornetQComponent
 
       }
 
-      if (config.getForwardingAddress() != null)
-      {
-         AddressSettings addressConfig = 
configuration.getAddressesSettings().get(config.getForwardingAddress());
-
-         // The address config could be null on certain test cases or some 
Embedded environment
-         if (addressConfig == null)
-         {
-            // We will certainly have this warning on testcases which is ok
-            
HornetQServerLogger.LOGGER.bridgeCantFindAddressConfig(config.getName(), 
config.getForwardingAddress());
-         }
-         else
-         {
-            final int windowSize = config.getConfirmationWindowSize();
-            final long maxBytes = addressConfig.getMaxSizeBytes();
-
-            if (maxBytes != -1 && maxBytes < windowSize)
-            {
-               
HornetQServerLogger.LOGGER.bridgeConfirmationWindowTooSmall(config.getName(),
-                                                                           
config.getForwardingAddress(), windowSize,
-                                                                           
maxBytes);
-            }
-         }
-      }
-
       serverLocator.setIdentity("Bridge " + config.getName());
       
serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/HornetQServerSideProtocolManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/HornetQServerSideProtocolManagerFactory.java
 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/HornetQServerSideProtocolManagerFactory.java
new file mode 100644
index 0000000..b45cf94
--- /dev/null
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/HornetQServerSideProtocolManagerFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.cluster;
+
+import org.hornetq.core.protocol.ServerPacketDecoder;
+import org.hornetq.core.protocol.core.impl.HornetQClientProtocolManager;
+import org.hornetq.core.protocol.core.impl.PacketDecoder;
+import org.hornetq.spi.core.remoting.ClientProtocolManager;
+import org.hornetq.spi.core.remoting.ClientProtocolManagerFactory;
+
+/**
+ * A protocol manager that will replace the packet manager for inter-server 
communications
+ * @author Clebert Suconic
+ */
+public class HornetQServerSideProtocolManagerFactory implements 
ClientProtocolManagerFactory
+{
+   private static final HornetQServerSideProtocolManagerFactory INSTANCE = new 
 HornetQServerSideProtocolManagerFactory();
+
+   public static HornetQServerSideProtocolManagerFactory getInstance()
+   {
+      return INSTANCE;
+   }
+
+   private HornetQServerSideProtocolManagerFactory()
+   {
+   }
+
+   private static final long serialVersionUID = 1;
+
+   @Override
+   public ClientProtocolManager newProtocolManager()
+   {
+      return new HornetQReplicationProtocolManager();
+   }
+
+   class HornetQReplicationProtocolManager extends HornetQClientProtocolManager
+   {
+      @Override
+      protected PacketDecoder getPacketDecoder()
+      {
+         return ServerPacketDecoder.INSTANCE;
+      }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/BackupPolicy.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/BackupPolicy.java
 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/BackupPolicy.java
new file mode 100644
index 0000000..f6749df
--- /dev/null
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/BackupPolicy.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.ha;
+
+import org.hornetq.api.config.HornetQDefaultConfiguration;
+import org.hornetq.core.server.impl.Activation;
+
+public abstract class BackupPolicy implements HAPolicy<Activation>
+{
+   protected ScaleDownPolicy scaleDownPolicy;
+   protected boolean restartBackup = 
HornetQDefaultConfiguration.isDefaultRestartBackup();
+
+   public ScaleDownPolicy getScaleDownPolicy()
+   {
+      return scaleDownPolicy;
+   }
+
+   public void setScaleDownPolicy(ScaleDownPolicy scaleDownPolicy)
+   {
+      this.scaleDownPolicy = scaleDownPolicy;
+   }
+
+
+   @Override
+   public boolean isBackup()
+   {
+      return true;
+   }
+
+   @Override
+   public String getScaleDownClustername()
+   {
+      return null;
+   }
+
+   @Override
+   public String getScaleDownGroupName()
+   {
+      return getScaleDownPolicy() != null ? 
getScaleDownPolicy().getGroupName() : null;
+   }
+
+   public boolean isRestartBackup()
+   {
+      return restartBackup;
+   }
+
+   public void setRestartBackup(boolean restartBackup)
+   {
+      this.restartBackup = restartBackup;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedHAManager.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedHAManager.java
 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedHAManager.java
new file mode 100644
index 0000000..85e97a0
--- /dev/null
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedHAManager.java
@@ -0,0 +1,313 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.ha;
+
+import org.hornetq.api.core.Pair;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.TopologyMember;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.ActivationParams;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServerLogger;
+import org.hornetq.core.server.cluster.ClusterControl;
+import org.hornetq.core.server.cluster.ClusterController;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ColocatedHAManager implements HAManager
+{
+
+   private final ColocatedPolicy haPolicy;
+
+   private final HornetQServer server;
+
+   private Map<String, HornetQServer> backupServers = new HashMap<>();
+
+   private boolean started;
+
+   public ColocatedHAManager(ColocatedPolicy haPolicy, HornetQServer 
hornetQServer)
+   {
+      this.haPolicy = haPolicy;
+      server = hornetQServer;
+   }
+
+   /**
+    * starts the HA manager.
+    */
+   public void start()
+   {
+      if (started)
+         return;
+
+      server.getActivation().haStarted();
+
+      started = true;
+   }
+
+   /**
+    * stop any backups
+    */
+   public void stop()
+   {
+      for (HornetQServer hornetQServer : backupServers.values())
+      {
+         try
+         {
+            hornetQServer.stop();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+            //todo
+         }
+      }
+      backupServers.clear();
+      started = false;
+   }
+
+   @Override
+   public boolean isStarted()
+   {
+      return started;
+   }
+
+   public synchronized boolean activateBackup(int backupSize, String 
journalDirectory, String bindingsDirectory, String largeMessagesDirectory, 
String pagingDirectory, SimpleString nodeID) throws Exception
+   {
+      if (backupServers.size() >= haPolicy.getMaxBackups() || backupSize != 
backupServers.size())
+      {
+         return false;
+      }
+      if (haPolicy.getBackupPolicy().isSharedStore())
+      {
+         return activateSharedStoreBackup(journalDirectory, bindingsDirectory, 
largeMessagesDirectory, pagingDirectory);
+      }
+      else
+      {
+         return activateReplicatedBackup(nodeID);
+      }
+   }
+
+
+   /**
+    * return the current backup servers
+    *
+    * @return the backups
+    */
+   public Map<String, HornetQServer> getBackupServers()
+   {
+      return backupServers;
+   }
+
+   /**
+    * send a request to a live server to start a backup for us
+    *
+    * @param connectorPair the connector for the node to request a backup from
+    * @param backupSize the current size of the requested nodes backups
+    * @param replicated
+    * @return true if the request wa successful.
+    * @throws Exception
+    */
+   public boolean requestBackup(Pair<TransportConfiguration, 
TransportConfiguration> connectorPair, int backupSize, boolean replicated) 
throws Exception
+   {
+      ClusterController clusterController = 
server.getClusterManager().getClusterController();
+      try
+            (
+                  ClusterControl clusterControl = 
clusterController.connectToNode(connectorPair.getA());
+            )
+      {
+         clusterControl.authorize();
+         if (replicated)
+         {
+            return clusterControl.requestReplicatedBackup(backupSize, 
server.getNodeID());
+         }
+         else
+         {
+            return clusterControl.requestSharedStoreBackup(backupSize,
+                  server.getConfiguration().getJournalDirectory(),
+                  server.getConfiguration().getBindingsDirectory(),
+                  server.getConfiguration().getLargeMessagesDirectory(),
+                  server.getConfiguration().getPagingDirectory());
+
+         }
+      }
+   }
+
+   private synchronized boolean activateSharedStoreBackup(String 
journalDirectory, String bindingsDirectory, String largeMessagesDirectory, 
String pagingDirectory) throws Exception
+   {
+      Configuration configuration = server.getConfiguration().copy();
+      HornetQServer backup = server.createBackupServer(configuration);
+      try
+      {
+         int portOffset = haPolicy.getBackupPortOffset() * 
(backupServers.size() + 1);
+         String name = "colocated_backup_" + backupServers.size() + 1;
+         //make sure we don't restart as we are colocated
+         haPolicy.getBackupPolicy().setRestartBackup(false);
+         //set the backup policy
+         backup.setHAPolicy(haPolicy.getBackupPolicy());
+         updateSharedStoreConfiguration(configuration, name, portOffset, 
haPolicy.getExcludedConnectors(), journalDirectory, bindingsDirectory, 
largeMessagesDirectory, pagingDirectory, 
haPolicy.getBackupPolicy().getScaleDownPolicy() == null);
+
+         backupServers.put(configuration.getName(), backup);
+         backup.start();
+      }
+      catch (Exception e)
+      {
+         backup.stop();
+         HornetQServerLogger.LOGGER.activateSharedStoreSlaveFailed(e);
+         return false;
+      }
+      HornetQServerLogger.LOGGER.activatingSharedStoreSlave();
+      return true;
+   }
+
+   /**
+    * activate a backup server replicating from a specified node.
+    *
+    * decline and the requesting server can cast a re vote
+    * @param nodeID the id of the node to replicate from
+    * @return true if the server was created and started
+    * @throws Exception
+    */
+   private synchronized boolean activateReplicatedBackup(SimpleString nodeID) 
throws Exception
+   {
+      Configuration configuration = server.getConfiguration().copy();
+      HornetQServer backup = server.createBackupServer(configuration);
+      try
+      {
+         TopologyMember member = 
server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeID.toString());
+         int portOffset = haPolicy.getBackupPortOffset() * 
(backupServers.size() + 1);
+         String name = "colocated_backup_" + backupServers.size() + 1;
+         //make sure we don't restart as we are colocated
+         haPolicy.getBackupPolicy().setRestartBackup(false);
+         //set the backup policy
+         backup.setHAPolicy(haPolicy.getBackupPolicy());
+         updateReplicatedConfiguration(configuration, name, portOffset, 
haPolicy.getExcludedConnectors(), 
haPolicy.getBackupPolicy().getScaleDownPolicy() == null);
+         backup.addActivationParam(ActivationParams.REPLICATION_ENDPOINT, 
member);
+         backupServers.put(configuration.getName(), backup);
+         backup.start();
+      }
+      catch (Exception e)
+      {
+         backup.stop();
+         HornetQServerLogger.LOGGER.activateReplicatedBackupFailed(e);
+         return false;
+      }
+      HornetQServerLogger.LOGGER.activatingReplica(nodeID);
+      return true;
+   }
+
+   /**
+    * update the backups configuration
+    * @param backupConfiguration the configuration to update
+    * @param name the new name of the backup
+    * @param portOffset the offset for the acceptors and any connectors that 
need changing
+    * @param remoteConnectors the connectors that don't need off setting, 
typically remote
+    * @param journalDirectory
+    * @param bindingsDirectory
+    * @param largeMessagesDirectory
+    * @param pagingDirectory
+    * @param fullServer
+    */
+   private static void updateSharedStoreConfiguration(Configuration 
backupConfiguration,
+                                                      String name,
+                                                      int portOffset,
+                                                      List<String> 
remoteConnectors,
+                                                      String journalDirectory,
+                                                      String bindingsDirectory,
+                                                      String 
largeMessagesDirectory,
+                                                      String pagingDirectory,
+                                                      boolean fullServer)
+   {
+      backupConfiguration.setName(name);
+      backupConfiguration.setJournalDirectory(journalDirectory);
+      backupConfiguration.setBindingsDirectory(bindingsDirectory);
+      backupConfiguration.setLargeMessagesDirectory(largeMessagesDirectory);
+      backupConfiguration.setPagingDirectory(pagingDirectory);
+      updateAcceptorsAndConnectors(backupConfiguration, portOffset, 
remoteConnectors, fullServer);
+   }
+
+   /**
+    * update the backups configuration
+    *
+    * @param backupConfiguration the configuration to update
+    * @param name the new name of the backup
+    * @param portOffset the offset for the acceptors and any connectors that 
need changing
+    * @param remoteConnectors the connectors that don't need off setting, 
typically remote
+    */
+   private static void updateReplicatedConfiguration(Configuration 
backupConfiguration,
+                                                     String name,
+                                                     int portOffset,
+                                                     List<String> 
remoteConnectors,
+                                                     boolean fullServer)
+   {
+      backupConfiguration.setName(name);
+      
backupConfiguration.setJournalDirectory(backupConfiguration.getJournalDirectory()
 + name);
+      
backupConfiguration.setPagingDirectory(backupConfiguration.getPagingDirectory() 
+ name);
+      
backupConfiguration.setLargeMessagesDirectory(backupConfiguration.getLargeMessagesDirectory()
 + name);
+      
backupConfiguration.setBindingsDirectory(backupConfiguration.getBindingsDirectory()
 + name);
+      updateAcceptorsAndConnectors(backupConfiguration, portOffset, 
remoteConnectors, fullServer);
+   }
+
+   private static void updateAcceptorsAndConnectors(Configuration 
backupConfiguration,
+                                                    int portOffset,
+                                                    List<String> 
remoteConnectors,
+                                                    boolean fullServer)
+   {
+      //we only do this if we are a full server, if scale down then our 
acceptors wont be needed and our connectors will
+      // be the same as the parent server
+      if (fullServer)
+      {
+         Set<TransportConfiguration> acceptors = 
backupConfiguration.getAcceptorConfigurations();
+         for (TransportConfiguration acceptor : acceptors)
+         {
+            updatebackupParams(backupConfiguration.getName(), portOffset, 
acceptor.getParams());
+         }
+         Map<String, TransportConfiguration> connectorConfigurations = 
backupConfiguration.getConnectorConfigurations();
+         for (Map.Entry<String, TransportConfiguration> entry : 
connectorConfigurations.entrySet())
+         {
+            //check to make sure we aren't a remote connector as this 
shouldn't be changed
+            if (!remoteConnectors.contains(entry.getValue().getName()))
+            {
+               updatebackupParams(backupConfiguration.getName(), portOffset, 
entry.getValue().getParams());
+            }
+         }
+      }
+      else
+      {
+         //if we are scaling down then we wont need any acceptors but clear 
anyway for belts and braces
+         backupConfiguration.getAcceptorConfigurations().clear();
+      }
+   }
+
+   private static void updatebackupParams(String name, int portOffset, 
Map<String, Object> params)
+   {
+      if (params != null)
+      {
+         Object port = params.get("port");
+         if (port != null)
+         {
+            Integer integer = Integer.valueOf(port.toString());
+            integer += portOffset;
+            params.put("port", integer.toString());
+         }
+         Object serverId = params.get("server-id");
+         if (serverId != null)
+         {
+            params.put("server-id", serverId.toString() + "(" + name + ")");
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedPolicy.java
----------------------------------------------------------------------
diff --git 
a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedPolicy.java
 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedPolicy.java
new file mode 100644
index 0000000..f951ec9
--- /dev/null
+++ 
b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedPolicy.java
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.cluster.ha;
+
+import org.hornetq.api.config.HornetQDefaultConfiguration;
+import org.hornetq.core.server.impl.ColocatedActivation;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.LiveActivation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class ColocatedPolicy implements HAPolicy<LiveActivation>
+{
+
+   /*live stuff*/
+   private boolean requestBackup = 
HornetQDefaultConfiguration.isDefaultHapolicyRequestBackup();
+
+   private int backupRequestRetries = 
HornetQDefaultConfiguration.getDefaultHapolicyBackupRequestRetries();
+
+   private long backupRequestRetryInterval = 
HornetQDefaultConfiguration.getDefaultHapolicyBackupRequestRetryInterval();
+
+   private int maxBackups = 
HornetQDefaultConfiguration.getDefaultHapolicyMaxBackups();
+
+   private int backupPortOffset = 
HornetQDefaultConfiguration.getDefaultHapolicyBackupPortOffset();
+
+   /*backup stuff*/
+   private List<String> excludedConnectors = new ArrayList<>();
+
+   private BackupPolicy backupPolicy;
+
+   private HAPolicy<LiveActivation> livePolicy;
+
+   public ColocatedPolicy(boolean requestBackup,
+                          int backupRequestRetries,
+                          long backupRequestRetryInterval,
+                          int maxBackups,
+                          int backupPortOffset,
+                          List<String> excludedConnectors,
+                          HAPolicy livePolicy,
+                          BackupPolicy backupPolicy)
+   {
+      this.requestBackup = requestBackup;
+      this.backupRequestRetries = backupRequestRetries;
+      this.backupRequestRetryInterval = backupRequestRetryInterval;
+      this.maxBackups = maxBackups;
+      this.backupPortOffset = backupPortOffset;
+      this.excludedConnectors = excludedConnectors;
+      this.livePolicy = livePolicy;
+      this.backupPolicy = backupPolicy;
+   }
+
+   @Override
+   public String getBackupGroupName()
+   {
+      return null;
+   }
+
+   @Override
+   public String getScaleDownGroupName()
+   {
+      return null;
+   }
+
+   @Override
+   public boolean isSharedStore()
+   {
+      return backupPolicy.isSharedStore();
+   }
+
+   @Override
+   public boolean isBackup()
+   {
+      return false;
+   }
+
+   @Override
+   public LiveActivation createActivation(HornetQServerImpl server, boolean 
wasLive, Map<String, Object> activationParams, 
HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws 
Exception
+   {
+      return new ColocatedActivation(server, this, 
livePolicy.createActivation(server, wasLive, activationParams, 
shutdownOnCriticalIO));
+   }
+
+   @Override
+   public boolean canScaleDown()
+   {
+      return false;
+   }
+
+   @Override
+   public String getScaleDownClustername()
+   {
+      return null;
+   }
+
+
+
+   public boolean isRequestBackup()
+   {
+      return requestBackup;
+   }
+
+   public void setRequestBackup(boolean requestBackup)
+   {
+      this.requestBackup = requestBackup;
+   }
+
+   public int getBackupRequestRetries()
+   {
+      return backupRequestRetries;
+   }
+
+   public void setBackupRequestRetries(int backupRequestRetries)
+   {
+      this.backupRequestRetries = backupRequestRetries;
+   }
+
+   public long getBackupRequestRetryInterval()
+   {
+      return backupRequestRetryInterval;
+   }
+
+   public void setBackupRequestRetryInterval(long backupRequestRetryInterval)
+   {
+      this.backupRequestRetryInterval = backupRequestRetryInterval;
+   }
+
+   public int getMaxBackups()
+   {
+      return maxBackups;
+   }
+
+   public void setMaxBackups(int maxBackups)
+   {
+      this.maxBackups = maxBackups;
+   }
+
+   public int getBackupPortOffset()
+   {
+      return backupPortOffset;
+   }
+
+   public void setBackupPortOffset(int backupPortOffset)
+   {
+      this.backupPortOffset = backupPortOffset;
+   }
+
+   public List<String> getExcludedConnectors()
+   {
+      return excludedConnectors;
+   }
+
+   public void setExcludedConnectors(List<String> excludedConnectors)
+   {
+      this.excludedConnectors = excludedConnectors;
+   }
+
+   public HAPolicy<LiveActivation> getLivePolicy()
+   {
+      return livePolicy;
+   }
+
+   public void setLivePolicy(HAPolicy<LiveActivation> livePolicy)
+   {
+      this.livePolicy = livePolicy;
+   }
+
+   public BackupPolicy getBackupPolicy()
+   {
+      return backupPolicy;
+   }
+
+   public void setBackupPolicy(BackupPolicy backupPolicy)
+   {
+      this.backupPolicy = backupPolicy;
+   }
+}

Reply via email to