http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java b/hornetq-server/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java index 8b609e8..2034b61 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/HornetQMessageBundle.java @@ -356,4 +356,16 @@ public interface HornetQMessageBundle @Message(id = 119105, value = "Server will not accept create session request since scale down has not occurred", format = Message.Format.MESSAGE_FORMAT) HornetQSessionCreationException sessionNotFailedOver(); + + @Message(id = 119106, value = "Invalid slow consumer policy type {0}", format = Message.Format.MESSAGE_FORMAT) + IllegalArgumentException invalidSlowConsumerPolicyType(String val); + + @Message(id = 119107, value = "consumer connections for address {0} closed by management", format = Message.Format.MESSAGE_FORMAT) + HornetQInternalErrorException consumerConnectionsClosedByManagement(String address); + + @Message(id = 119108, value = "connections for user {0} closed by management", format = Message.Format.MESSAGE_FORMAT) + HornetQInternalErrorException connectionsForUserClosedByManagement(String userName); + + @Message(id = 119109, value = "unsupported HA Policy Configuration {0}", format = Message.Format.MESSAGE_FORMAT) + HornetQIllegalStateException unsupportedHAPolicyConfiguration(Object o); }
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServer.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServer.java b/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServer.java index 56bbc4a..7e9ee28 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServer.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServer.java @@ -16,11 +16,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - -import org.hornetq.api.core.HornetQException; -import org.hornetq.api.core.Pair; import org.hornetq.api.core.SimpleString; -import org.hornetq.api.core.TransportConfiguration; import org.hornetq.core.config.BridgeConfiguration; import org.hornetq.core.config.Configuration; import org.hornetq.core.config.DivertConfiguration; @@ -28,14 +24,14 @@ import org.hornetq.core.management.impl.HornetQServerControlImpl; import org.hornetq.core.paging.PagingManager; import org.hornetq.core.persistence.StorageManager; import org.hornetq.core.postoffice.PostOffice; -import org.hornetq.core.protocol.core.CoreRemotingConnection; import org.hornetq.core.remoting.server.RemotingService; -import org.hornetq.core.replication.ReplicationEndpoint; import org.hornetq.core.replication.ReplicationManager; import org.hornetq.core.security.Role; -import org.hornetq.core.server.cluster.ClusterConnection; +import org.hornetq.core.security.SecurityStore; import org.hornetq.core.server.cluster.ClusterManager; +import org.hornetq.core.server.cluster.ha.HAPolicy; import org.hornetq.core.server.group.GroupingHandler; +import org.hornetq.core.server.impl.Activation; import org.hornetq.core.server.impl.ConnectorsService; import org.hornetq.core.server.management.ManagementService; import org.hornetq.core.settings.HierarchicalRepository; @@ -112,7 +108,10 @@ public interface HornetQServer extends HornetQComponent boolean preAcknowledge, boolean xa, String defaultAddress, - SessionCallback callback) throws Exception; + SessionCallback callback, + ServerSessionFactory sessionFactory) throws Exception; + + SecurityStore getSecurityStore(); void removeSession(String name) throws Exception; @@ -153,17 +152,6 @@ public interface HornetQServer extends HornetQComponent boolean waitForActivation(long timeout, TimeUnit unit) throws InterruptedException; /** - * Wait for backup synchronization when using synchronization - * @param timeout - * @param unit - * @see CountDownLatch#await(long, TimeUnit) - * @return {@code true} if the server was already initialized or if it was initialized within the - * timeout period, {@code false} otherwise. - * @throws InterruptedException - */ - boolean waitForBackupSync(long timeout, TimeUnit unit) throws InterruptedException; - - /** * Creates a shared queue. if non durable it will exist as long as there are consumers. * * Notice: the queue won't be deleted until the first consumer arrives. @@ -211,8 +199,6 @@ public interface HornetQServer extends HornetQComponent GroupingHandler getGroupingHandler(); - ReplicationEndpoint getReplicationEndpoint(); - ReplicationManager getReplicationManager(); void deployDivert(DivertConfiguration config) throws Exception; @@ -238,21 +224,6 @@ public interface HornetQServer extends HornetQComponent void stop(boolean failoverOnServerShutdown) throws Exception; - /** - * Starts replication. - * <p> - * This will spawn a new thread that will sync all persistent data with the new backup. This - * method may also trigger fail-back if the backup asks for it and the server configuration - * allows. - * @param rc - * @param pair - * @param clusterConnection - * @throws HornetQAlreadyReplicatingException if replication is already taking place - * @throws HornetQException - */ - void startReplication(CoreRemotingConnection rc, ClusterConnection clusterConnection, - Pair<TransportConfiguration, TransportConfiguration> pair, boolean failBackRequest) throws HornetQException; - /* * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will * replace any factories with the same protocol @@ -269,4 +240,10 @@ public interface HornetQServer extends HornetQComponent void addScaledDownNode(SimpleString scaledDownNodeId); boolean hasScaledDown(SimpleString scaledDownNodeId); + + Activation getActivation(); + + HAPolicy getHAPolicy(); + + void setHAPolicy(HAPolicy haPolicy); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java b/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java index cdeb3d1..3309946 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/HornetQServerLogger.java @@ -58,6 +58,7 @@ import org.hornetq.core.server.cluster.impl.BridgeImpl; import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl; import org.hornetq.core.server.impl.HornetQServerImpl; import org.hornetq.core.server.impl.ServerSessionImpl; +import org.hornetq.core.server.management.Notification; import org.hornetq.utils.FutureLatch; import org.jboss.logging.BasicLogger; import org.jboss.logging.annotations.Cause; @@ -279,6 +280,18 @@ public interface HornetQServerLogger extends BasicLogger @Message(id = 221047, value = "Backup Server has scaled down to live server", format = Message.Format.MESSAGE_FORMAT) void backupServerScaledDown(); + @LogMessage(level = Logger.Level.INFO) + @Message(id = 221048, value = "Consumer {0}:{1} attached to queue ''{2}'' from {3} identified as ''slow.'' Expected consumption rate: {4} msgs/second; actual consumption rate: {5} msgs/second.", format = Message.Format.MESSAGE_FORMAT) + void slowConsumerDetected(String sessionID, long consumerID, String queueName, String remoteAddress, float slowConsumerThreshold, float consumerRate); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 221049, value = "Activating Replica for node: {0}", format = Message.Format.MESSAGE_FORMAT) + void activatingReplica(SimpleString nodeID); + + @LogMessage(level = Logger.Level.INFO) + @Message(id = 221050, value = "Activating Shared Store Slave", format = Message.Format.MESSAGE_FORMAT) + void activatingSharedStoreSlave(); + @LogMessage(level = Logger.Level.WARN) @Message(id = 222000, value = "HornetQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope", format = Message.Format.MESSAGE_FORMAT) @@ -555,11 +568,6 @@ public interface HornetQServerLogger extends BasicLogger void errorProcessingIOCallback(Integer errorCode, String errorMessage); @LogMessage(level = Logger.Level.WARN) - @Message(id = 222064, value = "Client with version {0} and address {1} is not compatible with server version {2}. Please ensure all clients and servers are upgraded to the same version for them to interoperate properly", - format = Message.Format.MESSAGE_FORMAT) - void incompatibleVersion(Integer version, String remoteAddress, String fullVersion); - - @LogMessage(level = Logger.Level.WARN) @Message(id = 222065, value = "Client is not being consistent on the request versioning. It just sent a version id={0} while it informed {1} previously", format = Message.Format.MESSAGE_FORMAT) void incompatibleVersionAfterConnect(int version, int clientVersion); @@ -1082,7 +1090,7 @@ public interface HornetQServerLogger extends BasicLogger @LogMessage(level = Logger.Level.WARN) @Message(id = 222187, - value = "Failed to activate replicated backup", + value = "Failed to activate replicata", format = Message.Format.MESSAGE_FORMAT) void activateReplicatedBackupFailed(@Cause Throwable e); @@ -1092,6 +1100,12 @@ public interface HornetQServerLogger extends BasicLogger format = Message.Format.MESSAGE_FORMAT) void unableToFindTargetQueue(String targetNodeID); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222189, + value = "Failed to activate shared store slave", + format = Message.Format.MESSAGE_FORMAT) + void activateSharedStoreSlaveFailed(@Cause Throwable e); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); @@ -1330,4 +1344,17 @@ public interface HornetQServerLogger extends BasicLogger @LogMessage(level = Logger.Level.ERROR) @Message(id = 224061, value = "Setting both <{0}> and <ha-policy> is invalid. Please use <ha-policy> exclusively as <{0}> is deprecated. Ignoring <{0}> value.", format = Message.Format.MESSAGE_FORMAT) void incompatibleWithHAPolicy(String parameter); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 224062, value = "Failed to send SLOW_CONSUMER notification: {0}", format = Message.Format.MESSAGE_FORMAT) + void failedToSendSlowConsumerNotification(Notification notification, @Cause Exception e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 224063, value = "Failed to close consumer connections for address {0}", format = Message.Format.MESSAGE_FORMAT) + void failedToCloseConsumerConnectionsForAddress(String address, @Cause Exception e); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 224064, value = "Setting <{0}> is invalid with this HA Policy Configuration. Please use <ha-policy> exclusively or remove. Ignoring <{0}> value.", format = Message.Format.MESSAGE_FORMAT) + void incompatibleWithHAPolicyChosen(String parameter); + } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/NodeManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/NodeManager.java b/hornetq-server/src/main/java/org/hornetq/core/server/NodeManager.java index 2fe948b..049a3a6 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/NodeManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/NodeManager.java @@ -39,7 +39,6 @@ public abstract class NodeManager implements HornetQComponent private final Object nodeIDGuard = new Object(); private SimpleString nodeID; private UUID uuid; - private String nodeGroupName; private boolean isStarted = false; protected FileChannel channel; @@ -122,16 +121,6 @@ public abstract class NodeManager implements HornetQComponent } } - public void setNodeGroupName(String nodeGroupName) - { - this.nodeGroupName = nodeGroupName; - } - - public String getNodeGroupName() - { - return nodeGroupName; - } - public abstract boolean isAwaitingFailback() throws Exception; public abstract boolean isBackupLive() throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/Queue.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/Queue.java b/hornetq-server/src/main/java/org/hornetq/core/server/Queue.java index 7526b44..afa03ff 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/Queue.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/Queue.java @@ -99,28 +99,8 @@ public interface Queue extends Bindable void destroyPaging() throws Exception; - /** - * It will wait for up to 10 seconds for a flush on the executors and return the number of messages added. - * if the executor is busy for any reason (say an unbehaved consumer) we will just return the current value. - * - * @return - */ long getMessageCount(); - /** - * This method will return the messages added after waiting some time on the flush executors. - * If the executor couldn't be flushed within the timeout we will just return the current value without any warn - * - * @param timeout Time to wait for current executors to finish in milliseconds. - * @return - */ - long getMessageCount(long timeout); - - /** - * Return the current message count without waiting for scheduled executors to finish - */ - long getInstantMessageCount(); - int getDeliveringCount(); void referenceHandled(); @@ -137,24 +117,9 @@ public interface Queue extends Bindable */ Map<String, List<MessageReference>> getDeliveringMessages(); - /** - * It will wait for up to 10 seconds for a flush on the executors and return the number of messages added. - * if the executor is busy for any reason (say an unbehaved consumer) we will just return the current value. - * - * @return - */ long getMessagesAdded(); - /** - * This method will return the messages added after waiting some time on the flush executors. - * If the executor couldn't be flushed within the timeout we will just return the current value without any warn - * - * @param timeout Time to wait for current executors to finish in milliseconds. - * @return - */ - long getMessagesAdded(long timeout); - - long getInstantMessagesAdded(); + long getMessagesAcknowledged(); MessageReference removeReferenceWithID(long id) throws Exception; @@ -255,7 +220,13 @@ public interface Queue extends Bindable void resetMessagesAdded(); + void resetMessagesAcknowledged(); + void incrementMesssagesAdded(); List<MessageReference> cancelScheduledMessages(); + + void postAcknowledge(MessageReference ref); + + float getRate(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/ServerConsumer.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/ServerConsumer.java b/hornetq-server/src/main/java/org/hornetq/core/server/ServerConsumer.java index 279cff7..6d94d01 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/ServerConsumer.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/ServerConsumer.java @@ -25,25 +25,45 @@ import org.hornetq.core.transaction.Transaction; */ public interface ServerConsumer extends Consumer { + /** + * @see #getProtocolContext() + * @param protocolContext + */ + void setProtocolContext(Object protocolContext); + + /** + * An object set by the Protocol implementation. + * it could be anything pre-determined by the implementation + */ + Object getProtocolContext(); + long getID(); Object getConnectionID(); void close(boolean failed) throws Exception; + /** + * This method is just to remove itself from Queues. + * If for any reason during a close an exception occurred, the exception treatment + * will call removeItself what should take the consumer out of any queues. + * @throws Exception + */ + void removeItself() throws Exception; + List<MessageReference> cancelRefs(boolean failed, boolean lastConsumedAsDelivered, Transaction tx) throws Exception; void setStarted(boolean started); - void receiveCredits(int credits) throws Exception; + void receiveCredits(int credits); Queue getQueue(); MessageReference removeReferenceByID(long messageID) throws Exception; - void acknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception; + void acknowledge(Transaction tx, long messageID) throws Exception; - void individualAcknowledge(boolean autoCommitAcks, Transaction tx, long messageID) throws Exception; + void individualAcknowledge(Transaction tx, long messageID) throws Exception; void individualCancel(final long messageID, boolean failed) throws Exception; @@ -56,6 +76,8 @@ public interface ServerConsumer extends Consumer long getCreationTime(); String getSessionID(); + + void promptDelivery(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/ServerMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/ServerMessage.java b/hornetq-server/src/main/java/org/hornetq/core/server/ServerMessage.java index cdbc028..6d84a3d 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/ServerMessage.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/ServerMessage.java @@ -27,7 +27,7 @@ import org.hornetq.core.paging.PagingStore; */ public interface ServerMessage extends MessageInternal, EncodingSupport { - void setMessageID(long id); + ServerMessage setMessageID(long id); MessageReference createReference(Queue queue); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/ServerSession.java b/hornetq-server/src/main/java/org/hornetq/core/server/ServerSession.java index 2fc4cb3..e8d523e 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/ServerSession.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/ServerSession.java @@ -43,7 +43,7 @@ public interface ServerSession RemotingConnection getRemotingConnection(); - void removeConsumer(long consumerID) throws Exception; + boolean removeConsumer(long consumerID) throws Exception; void acknowledge(long consumerID, long messageID) throws Exception; @@ -95,7 +95,7 @@ public interface ServerSession void deleteQueue(SimpleString name) throws Exception; - void createConsumer(long consumerID, SimpleString queueName, SimpleString filterString, boolean browseOnly) throws Exception; + ServerConsumer createConsumer(long consumerID, SimpleString queueName, SimpleString filterString, boolean browseOnly) throws Exception; QueueQueryResult executeQueueQuery(SimpleString name) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/ServerSessionFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/ServerSessionFactory.java b/hornetq-server/src/main/java/org/hornetq/core/server/ServerSessionFactory.java new file mode 100644 index 0000000..8a752e8 --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/ServerSessionFactory.java @@ -0,0 +1,41 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server; + +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.persistence.OperationContext; +import org.hornetq.core.persistence.StorageManager; +import org.hornetq.core.postoffice.PostOffice; +import org.hornetq.core.security.SecurityStore; +import org.hornetq.core.server.impl.HornetQServerImpl; +import org.hornetq.core.server.impl.ServerSessionImpl; +import org.hornetq.core.server.management.ManagementService; +import org.hornetq.core.transaction.ResourceManager; +import org.hornetq.spi.core.protocol.RemotingConnection; +import org.hornetq.spi.core.protocol.SessionCallback; + +public interface ServerSessionFactory +{ + + ServerSessionImpl createCoreSession(String name, String username, String password, + int minLargeMessageSize, boolean autoCommitSends, + boolean autoCommitAcks, boolean preAcknowledge, + boolean persistDeliveryCountBeforeDelivery, boolean xa, + RemotingConnection connection, StorageManager storageManager, + PostOffice postOffice, ResourceManager resourceManager, + SecurityStore securityStore, ManagementService managementService, + HornetQServerImpl hornetQServerImpl, SimpleString managementAddress, + SimpleString simpleString, SessionCallback callback, + OperationContext context) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/BackupManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/BackupManager.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/BackupManager.java index e313fab..37cdea5 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/BackupManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/BackupManager.java @@ -29,7 +29,6 @@ import org.hornetq.core.client.impl.ServerLocatorInternal; import org.hornetq.core.client.impl.Topology; import org.hornetq.core.config.ClusterConnectionConfiguration; import org.hornetq.core.config.Configuration; -import org.hornetq.core.protocol.ServerPacketDecoder; import org.hornetq.core.server.HornetQComponent; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServerLogger; @@ -80,7 +79,7 @@ public class BackupManager implements HornetQComponent for (BackupConnector conn : backupConnectors) { conn.start(); - if (configuration.getHAPolicy().isBackup() && configuration.getHAPolicy().isSharedStore()) + if (server.getHAPolicy().isBackup() && server.getHAPolicy().isSharedStore()) { conn.informTopology(); conn.announceBackup(); @@ -219,7 +218,7 @@ public class BackupManager implements HornetQComponent backupServerLocator.setIdentity("backupLocatorFor='" + server + "'"); backupServerLocator.setReconnectAttempts(-1); backupServerLocator.setInitialConnectAttempts(-1); - backupServerLocator.setPacketDecoder(ServerPacketDecoder.INSTANCE); + backupServerLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); } } @@ -260,8 +259,8 @@ public class BackupManager implements HornetQComponent clusterControl.authorize(); clusterControl.sendNodeAnnounce(System.currentTimeMillis(), nodeManager.getNodeId().toString(), - configuration.getHAPolicy().getBackupGroupName(), - configuration.getHAPolicy().getScaleDownClustername(), + server.getHAPolicy().getBackupGroupName(), + server.getHAPolicy().getScaleDownClustername(), true, connector, null); @@ -372,7 +371,7 @@ public class BackupManager implements HornetQComponent } ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs); locator.setClusterConnection(true); - locator.setPacketDecoder(ServerPacketDecoder.INSTANCE); + locator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); return locator; } return null; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterControl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterControl.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterControl.java index d3907f9..0ace46c 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterControl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterControl.java @@ -92,10 +92,10 @@ public class ClusterControl implements AutoCloseable * server. * @throws org.hornetq.api.core.HornetQException */ - public void announceReplicatingBackupToLive(final boolean attemptingFailBack) throws HornetQException + public void announceReplicatingBackupToLive(final boolean attemptingFailBack, String replicationClusterName) throws HornetQException { - ClusterConnectionConfiguration config = ConfigurationUtils.getReplicationClusterConfiguration(server.getConfiguration()); + ClusterConnectionConfiguration config = ConfigurationUtils.getReplicationClusterConfiguration(server.getConfiguration(), replicationClusterName); if (config == null) { HornetQServerLogger.LOGGER.announceBackupNoClusterConnections(); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterController.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterController.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterController.java index 6d7abca..8c028fe 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterController.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterController.java @@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.hornetq.api.core.DiscoveryGroupConfiguration; -import org.hornetq.api.core.HornetQAlreadyReplicatingException; import org.hornetq.api.core.HornetQException; import org.hornetq.api.core.Interceptor; import org.hornetq.api.core.Pair; @@ -34,16 +33,11 @@ import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.core.client.impl.ServerLocatorImpl; import org.hornetq.core.client.impl.ServerLocatorInternal; import org.hornetq.core.client.impl.Topology; -import org.hornetq.core.protocol.ServerPacketDecoder; import org.hornetq.core.protocol.core.Channel; import org.hornetq.core.protocol.core.ChannelHandler; import org.hornetq.core.protocol.core.CoreRemotingConnection; import org.hornetq.core.protocol.core.Packet; import org.hornetq.core.protocol.core.impl.PacketImpl; -import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage; -import org.hornetq.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage; -import org.hornetq.core.protocol.core.impl.wireformat.BackupRequestMessage; -import org.hornetq.core.protocol.core.impl.wireformat.BackupResponseMessage; import org.hornetq.core.protocol.core.impl.wireformat.ClusterConnectMessage; import org.hornetq.core.protocol.core.impl.wireformat.ClusterConnectReplyMessage; import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage; @@ -53,10 +47,10 @@ import org.hornetq.core.protocol.core.impl.wireformat.ScaleDownAnnounceMessage; import org.hornetq.core.server.HornetQComponent; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServerLogger; -import org.hornetq.core.server.cluster.ha.HAPolicy; import org.hornetq.core.server.cluster.qourum.QuorumManager; import org.hornetq.core.server.cluster.qourum.QuorumVoteHandler; import org.hornetq.core.server.cluster.qourum.Vote; +import org.hornetq.core.server.impl.Activation; import org.hornetq.spi.core.remoting.Acceptor; /** @@ -83,6 +77,7 @@ public class ClusterController implements HornetQComponent private CountDownLatch replicationClusterConnectedLatch; private boolean started; + private SimpleString replicatedClusterName; public ClusterController(HornetQServer server, ScheduledExecutorService scheduledExecutor) { @@ -99,9 +94,9 @@ public class ClusterController implements HornetQComponent //set the default locator that will be used to connecting to the default cluster. defaultLocator = locators.get(defaultClusterConnectionName); //create a locator for replication, either the default or the specified if not set - if (server.getConfiguration().getHAPolicy().getReplicationClustername() != null && !server.getConfiguration().getHAPolicy().getReplicationClustername().equals(defaultClusterConnectionName.toString())) + if (replicatedClusterName != null && !replicatedClusterName.equals(defaultClusterConnectionName)) { - replicationLocator = locators.get(server.getConfiguration().getHAPolicy().getReplicationClustername()); + replicationLocator = locators.get(replicatedClusterName); if (replicationLocator == null) { HornetQServerLogger.LOGGER.noClusterConnectionForReplicationCluster(); @@ -172,7 +167,7 @@ public class ClusterController implements HornetQComponent serverLocator.setReconnectAttempts(-1); serverLocator.setInitialConnectAttempts(-1); //this is used for replication so need to use the server packet decoder - serverLocator.setPacketDecoder(ServerPacketDecoder.INSTANCE); + serverLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); locators.put(name, serverLocator); } @@ -189,7 +184,7 @@ public class ClusterController implements HornetQComponent serverLocator.setReconnectAttempts(-1); serverLocator.setInitialConnectAttempts(-1); //this is used for replication so need to use the server packet decoder - serverLocator.setPacketDecoder(ServerPacketDecoder.INSTANCE); + serverLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); locators.put(name, serverLocator); } @@ -253,7 +248,7 @@ public class ClusterController implements HornetQComponent */ public ClusterControl connectToNodeInCluster(ClientSessionFactoryInternal sf) { - ((ServerLocatorInternal)sf.getServerLocator()).setPacketDecoder(ServerPacketDecoder.INSTANCE); + sf.getServerLocator().setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); return new ClusterControl(sf, server); } @@ -280,14 +275,14 @@ public class ClusterController implements HornetQComponent /** * used to set a channel handler on the connection that can be used by the cluster control - * - * @param channel the channel to set the handler + * @param channel the channel to set the handler * @param acceptorUsed the acceptor used for connection * @param remotingConnection the connection itself + * @param activation */ - public void addClusterChannelHandler(Channel channel, Acceptor acceptorUsed, CoreRemotingConnection remotingConnection) + public void addClusterChannelHandler(Channel channel, Acceptor acceptorUsed, CoreRemotingConnection remotingConnection, Activation activation) { - channel.setHandler(new ClusterControllerChannelHandler(channel, acceptorUsed, remotingConnection)); + channel.setHandler(new ClusterControllerChannelHandler(channel, acceptorUsed, remotingConnection, activation.getActivationChannelHandler(channel, acceptorUsed))); } public int getDefaultClusterSize() @@ -310,6 +305,11 @@ public class ClusterController implements HornetQComponent return server.getIdentity(); } + public void setReplicatedClusterName(String replicatedClusterName) + { + this.replicatedClusterName = new SimpleString(replicatedClusterName); + } + /** * a handler for handling packets sent between the cluster. */ @@ -318,13 +318,15 @@ public class ClusterController implements HornetQComponent private final Channel clusterChannel; private final Acceptor acceptorUsed; private final CoreRemotingConnection remotingConnection; + private final ChannelHandler channelHandler; boolean authorized = false; - public ClusterControllerChannelHandler(Channel clusterChannel, Acceptor acceptorUsed, CoreRemotingConnection remotingConnection) + public ClusterControllerChannelHandler(Channel clusterChannel, Acceptor acceptorUsed, CoreRemotingConnection remotingConnection, ChannelHandler channelHandler) { this.clusterChannel = clusterChannel; this.acceptorUsed = acceptorUsed; this.remotingConnection = remotingConnection; + this.channelHandler = channelHandler; } @Override @@ -336,6 +338,12 @@ public class ClusterController implements HornetQComponent { ClusterConnection clusterConnection = acceptorUsed.getClusterConnection(); + //if this acceptor isnt associated with a cluster connection use the default + if (clusterConnection == null) + { + clusterConnection = server.getClusterManager().getDefaultConnection(null); + } + ClusterConnectMessage msg = (ClusterConnectMessage) packet; if (server.getConfiguration().isSecurityEnabled() && !clusterConnection.verify(msg.getClusterUser(), msg.getClusterPassword())) @@ -387,24 +395,6 @@ public class ClusterController implements HornetQComponent HornetQServerLogger.LOGGER.debug("there is no acceptor used configured at the CoreProtocolManager " + this); } } - else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION) - { - BackupRegistrationMessage msg = (BackupRegistrationMessage)packet; - ClusterConnection clusterConnection = acceptorUsed.getClusterConnection(); - try - { - server.startReplication(remotingConnection, clusterConnection, getPair(msg.getConnector(), true), - msg.isFailBackRequest()); - } - catch (HornetQAlreadyReplicatingException are) - { - clusterChannel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.ALREADY_REPLICATING)); - } - catch (HornetQException e) - { - clusterChannel.send(new BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION)); - } - } else if (packet.getType() == PacketImpl.QUORUM_VOTE) { QuorumVoteMessage quorumVoteMessage = (QuorumVoteMessage) packet; @@ -413,31 +403,6 @@ public class ClusterController implements HornetQComponent Vote vote = quorumManager.vote(quorumVoteMessage.getHandler(), quorumVoteMessage.getVote()); clusterChannel.send(new QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote)); } - else if (packet.getType() == PacketImpl.BACKUP_REQUEST) - { - BackupRequestMessage backupRequestMessage = (BackupRequestMessage) packet; - boolean started = false; - try - { - if (backupRequestMessage.getBackupType() == HAPolicy.POLICY_TYPE.COLOCATED_REPLICATED) - { - started = server.getClusterManager().getHAManager().activateReplicatedBackup(backupRequestMessage.getBackupSize(), backupRequestMessage.getNodeID()); - } - else - { - started = server.getClusterManager().getHAManager().activateSharedStoreBackup(backupRequestMessage.getBackupSize(), - backupRequestMessage.getJournalDirectory(), - backupRequestMessage.getBindingsDirectory(), - backupRequestMessage.getLargeMessagesDirectory(), - backupRequestMessage.getPagingDirectory()); - } - } - catch (Exception e) - { - //todo log a warning and send false - } - clusterChannel.send(new BackupResponseMessage(started)); - } else if (packet.getType() == PacketImpl.SCALEDOWN_ANNOUNCEMENT) { ScaleDownAnnounceMessage message = (ScaleDownAnnounceMessage) packet; @@ -447,17 +412,13 @@ public class ClusterController implements HornetQComponent server.addScaledDownNode(message.getScaledDownNodeId()); } } + else if (channelHandler != null) + { + channelHandler.handlePacket(packet); + } } } - private Pair<TransportConfiguration, TransportConfiguration> getPair(TransportConfiguration conn, - boolean isBackup) - { - if (isBackup) - { - return new Pair<>(null, conn); - } - return new Pair<>(conn, null); - } + } /** http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterManager.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterManager.java index bd9ba38..62e8f22 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ClusterManager.java @@ -55,8 +55,8 @@ import org.hornetq.core.server.cluster.impl.BridgeImpl; import org.hornetq.core.server.cluster.impl.BroadcastGroupImpl; import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl; import org.hornetq.core.server.cluster.qourum.QuorumManager; +import org.hornetq.core.server.impl.Activation; import org.hornetq.core.server.management.ManagementService; -import org.hornetq.core.settings.impl.AddressSettings; import org.hornetq.spi.core.protocol.RemotingConnection; import org.hornetq.spi.core.remoting.Acceptor; import org.hornetq.utils.ConcurrentHashSet; @@ -66,8 +66,8 @@ import org.hornetq.utils.FutureLatch; /** * A ClusterManager manages {@link ClusterConnection}s, {@link BroadcastGroup}s and {@link Bridge}s. * <p/> - * Note that {@link ClusterConnectionBridge}s extend Bridges but are controlled over through - * {@link ClusterConnectionImpl}. As a node is discovered a new {@link ClusterConnectionBridge} is + * Note that {@link org.hornetq.core.server.cluster.impl.ClusterConnectionBridge}s extend Bridges but are controlled over through + * {@link ClusterConnectionImpl}. As a node is discovered a new {@link org.hornetq.core.server.cluster.impl.ClusterConnectionBridge} is * deployed. * * @author <a href="mailto:[email protected]">Tim Fox</a> @@ -112,9 +112,9 @@ public final class ClusterManager implements HornetQComponent return haManager; } - public void addClusterChannelHandler(Channel channel, Acceptor acceptorUsed, CoreRemotingConnection remotingConnection) + public void addClusterChannelHandler(Channel channel, Acceptor acceptorUsed, CoreRemotingConnection remotingConnection, Activation activation) { - clusterController.addClusterChannelHandler(channel, acceptorUsed, remotingConnection); + clusterController.addClusterChannelHandler(channel, acceptorUsed, remotingConnection, activation); } enum State @@ -172,7 +172,7 @@ public final class ClusterManager implements HornetQComponent clusterController = new ClusterController(server, scheduledExecutor); - haManager = new HAManager(server.getConfiguration().getHAPolicy(), server.getSecurityManager(), server, server.getConfiguration().getBackupServerConfigurations()); + haManager = server.getActivation().getHAManager(); } public String describe() @@ -235,12 +235,12 @@ public final class ClusterManager implements HornetQComponent public String getBackupGroupName() { - return configuration.getHAPolicy().getBackupGroupName(); + return server.getHAPolicy().getBackupGroupName(); } public String getScaleDownGroupName() { - return haManager.getHAPolicy().getScaleDownGroupName(); + return server.getHAPolicy().getScaleDownGroupName(); } public synchronized void deploy() throws Exception @@ -514,30 +514,6 @@ public final class ClusterManager implements HornetQComponent } - if (config.getForwardingAddress() != null) - { - AddressSettings addressConfig = configuration.getAddressesSettings().get(config.getForwardingAddress()); - - // The address config could be null on certain test cases or some Embedded environment - if (addressConfig == null) - { - // We will certainly have this warning on testcases which is ok - HornetQServerLogger.LOGGER.bridgeCantFindAddressConfig(config.getName(), config.getForwardingAddress()); - } - else - { - final int windowSize = config.getConfirmationWindowSize(); - final long maxBytes = addressConfig.getMaxSizeBytes(); - - if (maxBytes != -1 && maxBytes < windowSize) - { - HornetQServerLogger.LOGGER.bridgeConfirmationWindowTooSmall(config.getName(), - config.getForwardingAddress(), windowSize, - maxBytes); - } - } - } - serverLocator.setIdentity("Bridge " + config.getName()); serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize()); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/HornetQServerSideProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/HornetQServerSideProtocolManagerFactory.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/HornetQServerSideProtocolManagerFactory.java new file mode 100644 index 0000000..b45cf94 --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/HornetQServerSideProtocolManagerFactory.java @@ -0,0 +1,55 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.server.cluster; + +import org.hornetq.core.protocol.ServerPacketDecoder; +import org.hornetq.core.protocol.core.impl.HornetQClientProtocolManager; +import org.hornetq.core.protocol.core.impl.PacketDecoder; +import org.hornetq.spi.core.remoting.ClientProtocolManager; +import org.hornetq.spi.core.remoting.ClientProtocolManagerFactory; + +/** + * A protocol manager that will replace the packet manager for inter-server communications + * @author Clebert Suconic + */ +public class HornetQServerSideProtocolManagerFactory implements ClientProtocolManagerFactory +{ + private static final HornetQServerSideProtocolManagerFactory INSTANCE = new HornetQServerSideProtocolManagerFactory(); + + public static HornetQServerSideProtocolManagerFactory getInstance() + { + return INSTANCE; + } + + private HornetQServerSideProtocolManagerFactory() + { + } + + private static final long serialVersionUID = 1; + + @Override + public ClientProtocolManager newProtocolManager() + { + return new HornetQReplicationProtocolManager(); + } + + class HornetQReplicationProtocolManager extends HornetQClientProtocolManager + { + @Override + protected PacketDecoder getPacketDecoder() + { + return ServerPacketDecoder.INSTANCE; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/BackupPolicy.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/BackupPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/BackupPolicy.java new file mode 100644 index 0000000..f6749df --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/BackupPolicy.java @@ -0,0 +1,61 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.cluster.ha; + +import org.hornetq.api.config.HornetQDefaultConfiguration; +import org.hornetq.core.server.impl.Activation; + +public abstract class BackupPolicy implements HAPolicy<Activation> +{ + protected ScaleDownPolicy scaleDownPolicy; + protected boolean restartBackup = HornetQDefaultConfiguration.isDefaultRestartBackup(); + + public ScaleDownPolicy getScaleDownPolicy() + { + return scaleDownPolicy; + } + + public void setScaleDownPolicy(ScaleDownPolicy scaleDownPolicy) + { + this.scaleDownPolicy = scaleDownPolicy; + } + + + @Override + public boolean isBackup() + { + return true; + } + + @Override + public String getScaleDownClustername() + { + return null; + } + + @Override + public String getScaleDownGroupName() + { + return getScaleDownPolicy() != null ? getScaleDownPolicy().getGroupName() : null; + } + + public boolean isRestartBackup() + { + return restartBackup; + } + + public void setRestartBackup(boolean restartBackup) + { + this.restartBackup = restartBackup; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedHAManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedHAManager.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedHAManager.java new file mode 100644 index 0000000..85e97a0 --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedHAManager.java @@ -0,0 +1,313 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.cluster.ha; + +import org.hornetq.api.core.Pair; +import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.TopologyMember; +import org.hornetq.core.config.Configuration; +import org.hornetq.core.server.ActivationParams; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.HornetQServerLogger; +import org.hornetq.core.server.cluster.ClusterControl; +import org.hornetq.core.server.cluster.ClusterController; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class ColocatedHAManager implements HAManager +{ + + private final ColocatedPolicy haPolicy; + + private final HornetQServer server; + + private Map<String, HornetQServer> backupServers = new HashMap<>(); + + private boolean started; + + public ColocatedHAManager(ColocatedPolicy haPolicy, HornetQServer hornetQServer) + { + this.haPolicy = haPolicy; + server = hornetQServer; + } + + /** + * starts the HA manager. + */ + public void start() + { + if (started) + return; + + server.getActivation().haStarted(); + + started = true; + } + + /** + * stop any backups + */ + public void stop() + { + for (HornetQServer hornetQServer : backupServers.values()) + { + try + { + hornetQServer.stop(); + } + catch (Exception e) + { + e.printStackTrace(); + //todo + } + } + backupServers.clear(); + started = false; + } + + @Override + public boolean isStarted() + { + return started; + } + + public synchronized boolean activateBackup(int backupSize, String journalDirectory, String bindingsDirectory, String largeMessagesDirectory, String pagingDirectory, SimpleString nodeID) throws Exception + { + if (backupServers.size() >= haPolicy.getMaxBackups() || backupSize != backupServers.size()) + { + return false; + } + if (haPolicy.getBackupPolicy().isSharedStore()) + { + return activateSharedStoreBackup(journalDirectory, bindingsDirectory, largeMessagesDirectory, pagingDirectory); + } + else + { + return activateReplicatedBackup(nodeID); + } + } + + + /** + * return the current backup servers + * + * @return the backups + */ + public Map<String, HornetQServer> getBackupServers() + { + return backupServers; + } + + /** + * send a request to a live server to start a backup for us + * + * @param connectorPair the connector for the node to request a backup from + * @param backupSize the current size of the requested nodes backups + * @param replicated + * @return true if the request wa successful. + * @throws Exception + */ + public boolean requestBackup(Pair<TransportConfiguration, TransportConfiguration> connectorPair, int backupSize, boolean replicated) throws Exception + { + ClusterController clusterController = server.getClusterManager().getClusterController(); + try + ( + ClusterControl clusterControl = clusterController.connectToNode(connectorPair.getA()); + ) + { + clusterControl.authorize(); + if (replicated) + { + return clusterControl.requestReplicatedBackup(backupSize, server.getNodeID()); + } + else + { + return clusterControl.requestSharedStoreBackup(backupSize, + server.getConfiguration().getJournalDirectory(), + server.getConfiguration().getBindingsDirectory(), + server.getConfiguration().getLargeMessagesDirectory(), + server.getConfiguration().getPagingDirectory()); + + } + } + } + + private synchronized boolean activateSharedStoreBackup(String journalDirectory, String bindingsDirectory, String largeMessagesDirectory, String pagingDirectory) throws Exception + { + Configuration configuration = server.getConfiguration().copy(); + HornetQServer backup = server.createBackupServer(configuration); + try + { + int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1); + String name = "colocated_backup_" + backupServers.size() + 1; + //make sure we don't restart as we are colocated + haPolicy.getBackupPolicy().setRestartBackup(false); + //set the backup policy + backup.setHAPolicy(haPolicy.getBackupPolicy()); + updateSharedStoreConfiguration(configuration, name, portOffset, haPolicy.getExcludedConnectors(), journalDirectory, bindingsDirectory, largeMessagesDirectory, pagingDirectory, haPolicy.getBackupPolicy().getScaleDownPolicy() == null); + + backupServers.put(configuration.getName(), backup); + backup.start(); + } + catch (Exception e) + { + backup.stop(); + HornetQServerLogger.LOGGER.activateSharedStoreSlaveFailed(e); + return false; + } + HornetQServerLogger.LOGGER.activatingSharedStoreSlave(); + return true; + } + + /** + * activate a backup server replicating from a specified node. + * + * decline and the requesting server can cast a re vote + * @param nodeID the id of the node to replicate from + * @return true if the server was created and started + * @throws Exception + */ + private synchronized boolean activateReplicatedBackup(SimpleString nodeID) throws Exception + { + Configuration configuration = server.getConfiguration().copy(); + HornetQServer backup = server.createBackupServer(configuration); + try + { + TopologyMember member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeID.toString()); + int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1); + String name = "colocated_backup_" + backupServers.size() + 1; + //make sure we don't restart as we are colocated + haPolicy.getBackupPolicy().setRestartBackup(false); + //set the backup policy + backup.setHAPolicy(haPolicy.getBackupPolicy()); + updateReplicatedConfiguration(configuration, name, portOffset, haPolicy.getExcludedConnectors(), haPolicy.getBackupPolicy().getScaleDownPolicy() == null); + backup.addActivationParam(ActivationParams.REPLICATION_ENDPOINT, member); + backupServers.put(configuration.getName(), backup); + backup.start(); + } + catch (Exception e) + { + backup.stop(); + HornetQServerLogger.LOGGER.activateReplicatedBackupFailed(e); + return false; + } + HornetQServerLogger.LOGGER.activatingReplica(nodeID); + return true; + } + + /** + * update the backups configuration + * @param backupConfiguration the configuration to update + * @param name the new name of the backup + * @param portOffset the offset for the acceptors and any connectors that need changing + * @param remoteConnectors the connectors that don't need off setting, typically remote + * @param journalDirectory + * @param bindingsDirectory + * @param largeMessagesDirectory + * @param pagingDirectory + * @param fullServer + */ + private static void updateSharedStoreConfiguration(Configuration backupConfiguration, + String name, + int portOffset, + List<String> remoteConnectors, + String journalDirectory, + String bindingsDirectory, + String largeMessagesDirectory, + String pagingDirectory, + boolean fullServer) + { + backupConfiguration.setName(name); + backupConfiguration.setJournalDirectory(journalDirectory); + backupConfiguration.setBindingsDirectory(bindingsDirectory); + backupConfiguration.setLargeMessagesDirectory(largeMessagesDirectory); + backupConfiguration.setPagingDirectory(pagingDirectory); + updateAcceptorsAndConnectors(backupConfiguration, portOffset, remoteConnectors, fullServer); + } + + /** + * update the backups configuration + * + * @param backupConfiguration the configuration to update + * @param name the new name of the backup + * @param portOffset the offset for the acceptors and any connectors that need changing + * @param remoteConnectors the connectors that don't need off setting, typically remote + */ + private static void updateReplicatedConfiguration(Configuration backupConfiguration, + String name, + int portOffset, + List<String> remoteConnectors, + boolean fullServer) + { + backupConfiguration.setName(name); + backupConfiguration.setJournalDirectory(backupConfiguration.getJournalDirectory() + name); + backupConfiguration.setPagingDirectory(backupConfiguration.getPagingDirectory() + name); + backupConfiguration.setLargeMessagesDirectory(backupConfiguration.getLargeMessagesDirectory() + name); + backupConfiguration.setBindingsDirectory(backupConfiguration.getBindingsDirectory() + name); + updateAcceptorsAndConnectors(backupConfiguration, portOffset, remoteConnectors, fullServer); + } + + private static void updateAcceptorsAndConnectors(Configuration backupConfiguration, + int portOffset, + List<String> remoteConnectors, + boolean fullServer) + { + //we only do this if we are a full server, if scale down then our acceptors wont be needed and our connectors will + // be the same as the parent server + if (fullServer) + { + Set<TransportConfiguration> acceptors = backupConfiguration.getAcceptorConfigurations(); + for (TransportConfiguration acceptor : acceptors) + { + updatebackupParams(backupConfiguration.getName(), portOffset, acceptor.getParams()); + } + Map<String, TransportConfiguration> connectorConfigurations = backupConfiguration.getConnectorConfigurations(); + for (Map.Entry<String, TransportConfiguration> entry : connectorConfigurations.entrySet()) + { + //check to make sure we aren't a remote connector as this shouldn't be changed + if (!remoteConnectors.contains(entry.getValue().getName())) + { + updatebackupParams(backupConfiguration.getName(), portOffset, entry.getValue().getParams()); + } + } + } + else + { + //if we are scaling down then we wont need any acceptors but clear anyway for belts and braces + backupConfiguration.getAcceptorConfigurations().clear(); + } + } + + private static void updatebackupParams(String name, int portOffset, Map<String, Object> params) + { + if (params != null) + { + Object port = params.get("port"); + if (port != null) + { + Integer integer = Integer.valueOf(port.toString()); + integer += portOffset; + params.put("port", integer.toString()); + } + Object serverId = params.get("server-id"); + if (serverId != null) + { + params.put("server-id", serverId.toString() + "(" + name + ")"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedPolicy.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedPolicy.java new file mode 100644 index 0000000..f951ec9 --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ColocatedPolicy.java @@ -0,0 +1,187 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.cluster.ha; + +import org.hornetq.api.config.HornetQDefaultConfiguration; +import org.hornetq.core.server.impl.ColocatedActivation; +import org.hornetq.core.server.impl.HornetQServerImpl; +import org.hornetq.core.server.impl.LiveActivation; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class ColocatedPolicy implements HAPolicy<LiveActivation> +{ + + /*live stuff*/ + private boolean requestBackup = HornetQDefaultConfiguration.isDefaultHapolicyRequestBackup(); + + private int backupRequestRetries = HornetQDefaultConfiguration.getDefaultHapolicyBackupRequestRetries(); + + private long backupRequestRetryInterval = HornetQDefaultConfiguration.getDefaultHapolicyBackupRequestRetryInterval(); + + private int maxBackups = HornetQDefaultConfiguration.getDefaultHapolicyMaxBackups(); + + private int backupPortOffset = HornetQDefaultConfiguration.getDefaultHapolicyBackupPortOffset(); + + /*backup stuff*/ + private List<String> excludedConnectors = new ArrayList<>(); + + private BackupPolicy backupPolicy; + + private HAPolicy<LiveActivation> livePolicy; + + public ColocatedPolicy(boolean requestBackup, + int backupRequestRetries, + long backupRequestRetryInterval, + int maxBackups, + int backupPortOffset, + List<String> excludedConnectors, + HAPolicy livePolicy, + BackupPolicy backupPolicy) + { + this.requestBackup = requestBackup; + this.backupRequestRetries = backupRequestRetries; + this.backupRequestRetryInterval = backupRequestRetryInterval; + this.maxBackups = maxBackups; + this.backupPortOffset = backupPortOffset; + this.excludedConnectors = excludedConnectors; + this.livePolicy = livePolicy; + this.backupPolicy = backupPolicy; + } + + @Override + public String getBackupGroupName() + { + return null; + } + + @Override + public String getScaleDownGroupName() + { + return null; + } + + @Override + public boolean isSharedStore() + { + return backupPolicy.isSharedStore(); + } + + @Override + public boolean isBackup() + { + return false; + } + + @Override + public LiveActivation createActivation(HornetQServerImpl server, boolean wasLive, Map<String, Object> activationParams, HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception + { + return new ColocatedActivation(server, this, livePolicy.createActivation(server, wasLive, activationParams, shutdownOnCriticalIO)); + } + + @Override + public boolean canScaleDown() + { + return false; + } + + @Override + public String getScaleDownClustername() + { + return null; + } + + + + public boolean isRequestBackup() + { + return requestBackup; + } + + public void setRequestBackup(boolean requestBackup) + { + this.requestBackup = requestBackup; + } + + public int getBackupRequestRetries() + { + return backupRequestRetries; + } + + public void setBackupRequestRetries(int backupRequestRetries) + { + this.backupRequestRetries = backupRequestRetries; + } + + public long getBackupRequestRetryInterval() + { + return backupRequestRetryInterval; + } + + public void setBackupRequestRetryInterval(long backupRequestRetryInterval) + { + this.backupRequestRetryInterval = backupRequestRetryInterval; + } + + public int getMaxBackups() + { + return maxBackups; + } + + public void setMaxBackups(int maxBackups) + { + this.maxBackups = maxBackups; + } + + public int getBackupPortOffset() + { + return backupPortOffset; + } + + public void setBackupPortOffset(int backupPortOffset) + { + this.backupPortOffset = backupPortOffset; + } + + public List<String> getExcludedConnectors() + { + return excludedConnectors; + } + + public void setExcludedConnectors(List<String> excludedConnectors) + { + this.excludedConnectors = excludedConnectors; + } + + public HAPolicy<LiveActivation> getLivePolicy() + { + return livePolicy; + } + + public void setLivePolicy(HAPolicy<LiveActivation> livePolicy) + { + this.livePolicy = livePolicy; + } + + public BackupPolicy getBackupPolicy() + { + return backupPolicy; + } + + public void setBackupPolicy(BackupPolicy backupPolicy) + { + this.backupPolicy = backupPolicy; + } +}
