http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAManager.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAManager.java index ec1892c..ff61df9 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAManager.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAManager.java @@ -12,574 +12,20 @@ */ package org.hornetq.core.server.cluster.ha; -import org.hornetq.api.core.DiscoveryGroupConfiguration; -import org.hornetq.api.core.HornetQBuffer; -import org.hornetq.api.core.HornetQException; -import org.hornetq.api.core.Pair; -import org.hornetq.api.core.SimpleString; -import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.api.core.client.HornetQClient; -import org.hornetq.api.core.client.TopologyMember; -import org.hornetq.core.client.impl.ServerLocatorInternal; -import org.hornetq.core.client.impl.Topology; -import org.hornetq.core.client.impl.TopologyMemberImpl; -import org.hornetq.core.config.BackupStrategy; -import org.hornetq.core.config.Configuration; -import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory; -import org.hornetq.core.server.ActivationParams; import org.hornetq.core.server.HornetQComponent; -import org.hornetq.core.server.HornetQMessageBundle; import org.hornetq.core.server.HornetQServer; -import org.hornetq.core.server.HornetQServerLogger; -import org.hornetq.core.server.cluster.ClusterControl; -import org.hornetq.core.server.cluster.ClusterController; -import org.hornetq.core.server.cluster.qourum.QuorumVote; -import org.hornetq.core.server.cluster.qourum.QuorumVoteHandler; -import org.hornetq.core.server.cluster.qourum.Vote; -import org.hornetq.spi.core.security.HornetQSecurityManager; -import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; /* -* An HAManager takes care of any colocated backups in a VM. These are either pre configured backups or backups requested -* by other lives. It also takes care of the quorum voting to request backups. +* An HAManager takes care of any colocated backups in a VM. * */ -public class HAManager implements HornetQComponent +public interface HAManager extends HornetQComponent { - private static final SimpleString REQUEST_BACKUP_QUORUM_VOTE = new SimpleString("RequestBackupQuorumVote"); - - private final HAPolicy haPolicy; - - private final HornetQSecurityManager securityManager; - - private final HornetQServer server; - - private Set<Configuration> backupServerConfigurations; - - private Map<String, HornetQServer> backupServers = new HashMap<>(); - - private boolean started; - - public HAManager(HAPolicy haPolicy, HornetQSecurityManager securityManager, HornetQServer hornetQServer, Set<Configuration> backupServerConfigurations) - { - this.haPolicy = haPolicy; - this.securityManager = securityManager; - server = hornetQServer; - this.backupServerConfigurations = backupServerConfigurations; - } - - /** - * starts the HA manager, any pre configured backups are started and if a backup is needed a quorum vote in initiated - */ - public void start() - { - if (started) - return; - server.getClusterManager().getQuorumManager().registerQuorumHandler(new RequestBackupQuorumVoteHandler()); - if (backupServerConfigurations != null) - { - for (Configuration configuration : backupServerConfigurations) - { - HornetQServer backup = server.createBackupServer(configuration); - backupServers.put(configuration.getName(), backup); - } - } - //start the backups - for (HornetQServer hornetQServer : backupServers.values()) - { - try - { - hornetQServer.start(); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - - //vote for a backup if required - if (haPolicy.isRequestBackup()) - { - server.getClusterManager().getQuorumManager().vote(new RequestBackupQuorumVote()); - } - started = true; - } - - /** - * stop any backups - */ - public void stop() - { - for (HornetQServer hornetQServer : backupServers.values()) - { - try - { - hornetQServer.stop(); - } - catch (Exception e) - { - e.printStackTrace(); - //todo - } - } - backupServers.clear(); - started = false; - } - - @Override - public boolean isStarted() - { - return started; - } - - public synchronized boolean activateSharedStoreBackup(int backupSize, String journalDirectory, String bindingsDirectory, String largeMessagesDirectory, String pagingDirectory) throws Exception - { - if (backupServers.size() >= haPolicy.getMaxBackups() || backupSize != backupServers.size()) - { - return false; - } - Configuration configuration = server.getConfiguration().copy(); - HornetQServer backup = server.createBackupServer(configuration); - try - { - int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1); - String name = "colocated_backup_" + backupServers.size() + 1; - updateSharedStoreConfiguration(configuration, haPolicy.getBackupStrategy(), name, portOffset, haPolicy.getRemoteConnectors(), journalDirectory, bindingsDirectory, largeMessagesDirectory, pagingDirectory); - //make sure we don't restart as we are colocated - configuration.getHAPolicy().setRestartBackup(false); - backupServers.put(configuration.getName(), backup); - backup.start(); - } - catch (Exception e) - { - backup.stop(); - //todo log a warning - return false; - } - return true; - } - - /** - * activate a backup server replicating from a specified node. - * - * @param backupSize the number of backups the requesting server thinks there are. if this is changed then we should - * decline and the requesting server can cast a re vote - * @param nodeID the id of the node to replicate from - * @return true if the server was created and started - * @throws Exception - */ - public synchronized boolean activateReplicatedBackup(int backupSize, SimpleString nodeID) throws Exception - { - if (backupServers.size() >= haPolicy.getMaxBackups() || backupSize != backupServers.size()) - { - return false; - } - Configuration configuration = server.getConfiguration().copy(); - HornetQServer backup = server.createBackupServer(configuration); - try - { - TopologyMember member = server.getClusterManager().getDefaultConnection(null).getTopology().getMember(nodeID.toString()); - int portOffset = haPolicy.getBackupPortOffset() * (backupServers.size() + 1); - String name = "colocated_backup_" + backupServers.size() + 1; - updateReplicatedConfiguration(configuration, haPolicy.getBackupStrategy(), name, portOffset, haPolicy.getRemoteConnectors()); - backup.addActivationParam(ActivationParams.REPLICATION_ENDPOINT, member); - backupServers.put(configuration.getName(), backup); - backup.start(); - } - catch (Exception e) - { - backup.stop(); - HornetQServerLogger.LOGGER.activateReplicatedBackupFailed(e); - return false; - } - return true; - } - /** * return the current backup servers * * @return the backups */ - public Map<String, HornetQServer> getBackupServers() - { - return backupServers; - } - - /** - * send a request to a live server to start a backup for us - * - * @param connectorPair the connector for the node to request a backup from - * @param backupSize the current size of the requested nodes backups - * @return true if the request wa successful. - * @throws Exception - */ - private boolean requestBackup(Pair<TransportConfiguration, TransportConfiguration> connectorPair, int backupSize) throws Exception - { - ClusterController clusterController = server.getClusterManager().getClusterController(); - try - ( - ClusterControl clusterControl = clusterController.connectToNode(connectorPair.getA()); - ) - { - clusterControl.authorize(); - if (haPolicy.getPolicyType() == HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE) - { - - return clusterControl.requestSharedStoreBackup(backupSize, - server.getConfiguration().getJournalDirectory(), - server.getConfiguration().getBindingsDirectory(), - server.getConfiguration().getLargeMessagesDirectory(), - server.getConfiguration().getPagingDirectory()); - } - else - { - return clusterControl.requestReplicatedBackup(backupSize, server.getNodeID()); - - } - } - } - - /** - * update the backups configuration - * @param backupConfiguration the configuration to update - * @param backupStrategy the strategy for the backup - * @param name the new name of the backup - * @param portOffset the offset for the acceptors and any connectors that need changing - * @param remoteConnectors the connectors that don't need off setting, typically remote - * @param journalDirectory - * @param bindingsDirectory - * @param largeMessagesDirectory - * @param pagingDirectory - */ - private static void updateSharedStoreConfiguration(Configuration backupConfiguration, - BackupStrategy backupStrategy, - String name, - int portOffset, - List<String> remoteConnectors, - String journalDirectory, - String bindingsDirectory, - String largeMessagesDirectory, - String pagingDirectory) - { - backupConfiguration.getHAPolicy().setBackupStrategy(backupStrategy); - backupConfiguration.setName(name); - backupConfiguration.setJournalDirectory(journalDirectory); - backupConfiguration.setBindingsDirectory(bindingsDirectory); - backupConfiguration.setLargeMessagesDirectory(largeMessagesDirectory); - backupConfiguration.setPagingDirectory(pagingDirectory); - backupConfiguration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - updateAcceptorsAndConnectors(backupConfiguration, portOffset, remoteConnectors); - } - - /** - * update the backups configuration - * - * @param backupConfiguration the configuration to update - * @param backupStrategy the strategy for the backup - * @param name the new name of the backup - * @param portOffset the offset for the acceptors and any connectors that need changing - * @param remoteConnectors the connectors that don't need off setting, typically remote - */ - private static void updateReplicatedConfiguration(Configuration backupConfiguration, - BackupStrategy backupStrategy, - String name, - int portOffset, - List<String> remoteConnectors) - { - backupConfiguration.getHAPolicy().setBackupStrategy(backupStrategy); - backupConfiguration.setName(name); - backupConfiguration.setJournalDirectory(backupConfiguration.getJournalDirectory() + name); - backupConfiguration.setPagingDirectory(backupConfiguration.getPagingDirectory() + name); - backupConfiguration.setLargeMessagesDirectory(backupConfiguration.getLargeMessagesDirectory() + name); - backupConfiguration.setBindingsDirectory(backupConfiguration.getBindingsDirectory() + name); - backupConfiguration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); - updateAcceptorsAndConnectors(backupConfiguration, portOffset, remoteConnectors); - } - - private static void updateAcceptorsAndConnectors(Configuration backupConfiguration, int portOffset, List<String> remoteConnectors) - { - //we only do this if we are a full server, if scale down then our acceptors wont be needed and our connectors will - // be the same as the parent server - if (backupConfiguration.getHAPolicy().getBackupStrategy() == BackupStrategy.FULL) - { - Set<TransportConfiguration> acceptors = backupConfiguration.getAcceptorConfigurations(); - for (TransportConfiguration acceptor : acceptors) - { - updatebackupParams(backupConfiguration.getName(), portOffset, acceptor.getParams()); - } - Map<String, TransportConfiguration> connectorConfigurations = backupConfiguration.getConnectorConfigurations(); - for (Map.Entry<String, TransportConfiguration> entry : connectorConfigurations.entrySet()) - { - //check to make sure we aren't a remote connector as this shouldn't be changed - if (!remoteConnectors.contains(entry.getValue().getName())) - { - updatebackupParams(backupConfiguration.getName(), portOffset, entry.getValue().getParams()); - } - } - } - else if (backupConfiguration.getHAPolicy().getBackupStrategy() == BackupStrategy.SCALE_DOWN) - { - //if we are scaling down then we wont need any acceptors but clear anyway for belts and braces - backupConfiguration.getAcceptorConfigurations().clear(); - } - } - - private static void updatebackupParams(String name, int portOffset, Map<String, Object> params) - { - if (params != null) - { - Object port = params.get("port"); - if (port != null) - { - Integer integer = Integer.valueOf(port.toString()); - integer += portOffset; - params.put("port", integer.toString()); - } - Object serverId = params.get("server-id"); - if (serverId != null) - { - params.put("server-id", serverId.toString() + "(" + name + ")"); - } - } - } - - public HAPolicy getHAPolicy() - { - return haPolicy; - } - - public ServerLocatorInternal getScaleDownConnector() throws HornetQException - { - if (!haPolicy.getScaleDownConnectors().isEmpty()) - { - return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(connectorNameListToArray(haPolicy.getScaleDownConnectors())); - } - else if (haPolicy.getScaleDownDiscoveryGroup() != null) - { - DiscoveryGroupConfiguration dg = server.getConfiguration().getDiscoveryGroupConfigurations().get(haPolicy.getScaleDownDiscoveryGroup()); - - if (dg == null) - { - throw HornetQMessageBundle.BUNDLE.noDiscoveryGroupFound(dg); - } - return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg); - } - else - { - Map<String, TransportConfiguration> connectorConfigurations = server.getConfiguration().getConnectorConfigurations(); - for (TransportConfiguration transportConfiguration : connectorConfigurations.values()) - { - if (transportConfiguration.getFactoryClassName().equals(InVMConnectorFactory.class.getName())) - { - return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(transportConfiguration); - } - } - } - throw HornetQMessageBundle.BUNDLE.noConfigurationFoundForScaleDown(); - } - - - - private TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames) - { - TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, - connectorNames.size()); - int count = 0; - for (String connectorName : connectorNames) - { - TransportConfiguration connector = server.getConfiguration().getConnectorConfigurations().get(connectorName); - - if (connector == null) - { - HornetQServerLogger.LOGGER.bridgeNoConnector(connectorName); - - return null; - } - - tcConfigs[count++] = connector; - } - - return tcConfigs; - } - /** - * A vote handler for incoming backup request votes - */ - private final class RequestBackupQuorumVoteHandler implements QuorumVoteHandler - { - @Override - public Vote vote(Vote vote) - { - return new RequestBackupVote(backupServers.size(), server.getNodeID().toString(), backupServers.size() < haPolicy.getMaxBackups()); - } - - @Override - public SimpleString getQuorumName() - { - return REQUEST_BACKUP_QUORUM_VOTE; - } - - @Override - public Vote decode(HornetQBuffer voteBuffer) - { - RequestBackupVote requestBackupVote = new RequestBackupVote(); - requestBackupVote.decode(voteBuffer); - return requestBackupVote; - } - } - - /** - * a quorum vote for backup requests - */ - private final class RequestBackupQuorumVote extends QuorumVote<RequestBackupVote, Pair<String, Integer>> - { - //the available nodes that we can request - private final List<Pair<String, Integer>> nodes = new ArrayList<>(); - - public RequestBackupQuorumVote() - { - super(REQUEST_BACKUP_QUORUM_VOTE); - } - - @Override - public Vote connected() - { - return new RequestBackupVote(); - } - - @Override - public Vote notConnected() - { - return new RequestBackupVote(); - } - - @Override - public void vote(RequestBackupVote vote) - { - //if the returned vote is available add it to the nodes we can request - if (vote.backupAvailable) - { - nodes.add(vote.getVote()); - } - } - - @Override - public Pair<String, Integer> getDecision() - { - //sort the nodes by how many backups they have and choose the first - Collections.sort(nodes, new Comparator<Pair<String, Integer>>() - { - @Override - public int compare(Pair<String, Integer> o1, Pair<String, Integer> o2) - { - return o1.getB().compareTo(o2.getB()); - } - }); - return nodes.get(0); - } - - @Override - public void allVotesCast(Topology voteTopology) - { - //if we have any nodes that we can request then send a request - if (nodes.size() > 0) - { - Pair<String, Integer> decision = getDecision(); - TopologyMemberImpl member = voteTopology.getMember(decision.getA()); - try - { - boolean backupStarted = requestBackup(member.getConnector(), decision.getB().intValue()); - if (!backupStarted) - { - nodes.clear(); - server.getScheduledPool().schedule(new Runnable() - { - @Override - public void run() - { - server.getClusterManager().getQuorumManager().vote(new RequestBackupQuorumVote()); - } - }, haPolicy.getBackupRequestRetryInterval(), TimeUnit.MILLISECONDS); - } - } - catch (Exception e) - { - e.printStackTrace(); - //todo - } - } - else - { - nodes.clear(); - server.getScheduledPool().schedule(new Runnable() - { - @Override - public void run() - { - server.getClusterManager().getQuorumManager().vote(RequestBackupQuorumVote.this); - } - }, haPolicy.getBackupRequestRetryInterval(), TimeUnit.MILLISECONDS); - } - } - - @Override - public SimpleString getName() - { - return REQUEST_BACKUP_QUORUM_VOTE; - } - } - - class RequestBackupVote extends Vote<Pair<String, Integer>> - { - private int backupsSize; - private String nodeID; - private boolean backupAvailable; - - public RequestBackupVote() - { - backupsSize = -1; - } - - public RequestBackupVote(int backupsSize, String nodeID, boolean backupAvailable) - { - this.backupsSize = backupsSize; - this.nodeID = nodeID; - this.backupAvailable = backupAvailable; - } - - @Override - public void encode(HornetQBuffer buff) - { - buff.writeInt(backupsSize); - buff.writeNullableString(nodeID); - buff.writeBoolean(backupAvailable); - } - - @Override - public void decode(HornetQBuffer buff) - { - backupsSize = buff.readInt(); - nodeID = buff.readNullableString(); - backupAvailable = buff.readBoolean(); - } - - @Override - public boolean isRequestServerVote() - { - return true; - } - - @Override - public Pair<String, Integer> getVote() - { - return new Pair<>(nodeID, backupsSize); - } - } + Map<String, HornetQServer> getBackupServers(); }
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicy.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicy.java index ed7c6de..a4003a9 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicy.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicy.java @@ -12,344 +12,36 @@ */ package org.hornetq.core.server.cluster.ha; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; +import java.util.Map; -import org.hornetq.api.config.HornetQDefaultConfiguration; -import org.hornetq.core.config.BackupStrategy; +import org.hornetq.core.server.impl.Activation; +import org.hornetq.core.server.impl.HornetQServerImpl; /** * Every live server will have an HAPolicy that configures the type of server that it should be either live, backup or * colocated (both). It also configures how, if colocated, it should react to sending and receiving requests for backups. */ -public class HAPolicy implements Serializable +public interface HAPolicy<T extends Activation> { - /** - * the policy type for a server - */ - public enum POLICY_TYPE - { - NONE((byte) 0), - REPLICATED((byte) 1), - SHARED_STORE((byte) 2), - BACKUP_REPLICATED((byte) 3), - BACKUP_SHARED_STORE((byte) 4), - COLOCATED_REPLICATED((byte) 5), - COLOCATED_SHARED_STORE((byte) 6); + /* + * created the Activation associated with this policy. + * */ + T createActivation(HornetQServerImpl server, boolean wasLive, Map<String, Object> activationParams, HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception; - private static final Set<POLICY_TYPE> all = EnumSet.allOf(POLICY_TYPE.class); - private final byte type; + boolean isSharedStore(); - POLICY_TYPE(byte type) - { - this.type = type; - } + boolean isBackup(); - public byte getType() - { - return type; - } + boolean canScaleDown(); - public static POLICY_TYPE toBackupType(byte b) - { - for (POLICY_TYPE backupType : all) - { - if (b == backupType.getType()) - { - return backupType; - } - } - return null; - } - } + /* + * todo These 3 methods could probably be moved as they are specific to the activation however they are needed for certain packets. + * */ - private POLICY_TYPE policyType = POLICY_TYPE.valueOf(HornetQDefaultConfiguration.getDefaultHapolicyType()); + String getBackupGroupName(); - private boolean requestBackup = HornetQDefaultConfiguration.isDefaultHapolicyRequestBackup(); + String getScaleDownGroupName(); - private int backupRequestRetries = HornetQDefaultConfiguration.getDefaultHapolicyBackupRequestRetries(); + String getScaleDownClustername(); - private long backupRequestRetryInterval = HornetQDefaultConfiguration.getDefaultHapolicyBackupRequestRetryInterval(); - - private int maxBackups = HornetQDefaultConfiguration.getDefaultHapolicyMaxBackups(); - - private int backupPortOffset = HornetQDefaultConfiguration.getDefaultHapolicyBackupPortOffset(); - - private BackupStrategy backupStrategy = BackupStrategy.valueOf(HornetQDefaultConfiguration.getDefaultHapolicyBackupStrategy()); - - private List<String> scaleDownConnectors = new ArrayList<>(); - - private String scaleDownDiscoveryGroup = null; - - private String scaleDownGroupName = null; - - private String backupGroupName = null; - - private List<String> remoteConnectors = new ArrayList<>(); - - private boolean checkForLiverServer = HornetQDefaultConfiguration.isDefaultCheckForLiveServer(); - - private boolean allowAutoFailBack = HornetQDefaultConfiguration.isDefaultAllowAutoFailback(); - - private long failbackDelay = HornetQDefaultConfiguration.getDefaultFailbackDelay(); - - private boolean failoverOnServerShutdown = HornetQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); - - private String replicationClusterName; - - private String scaleDownClusterName; - - private int maxSavedReplicatedJournalsSize = HornetQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize(); - - private boolean scaleDown = HornetQDefaultConfiguration.isDefaultScaleDown(); - - private boolean restartBackup = HornetQDefaultConfiguration.isDefaultRestartBackup(); - - public POLICY_TYPE getPolicyType() - { - return policyType; - } - - public void setPolicyType(POLICY_TYPE policyType) - { - this.policyType = policyType; - } - - public BackupStrategy getBackupStrategy() - { - return backupStrategy; - } - - public void setBackupStrategy(BackupStrategy backupStrategy) - { - this.backupStrategy = backupStrategy; - } - - /** - * Should we scaleDown our messages when the server is shutdown cleanly. - * - * @return true if server should scaleDown its messages on clean shutdown - * @see #setScaleDown(boolean) - */ - public boolean isScaleDown() - { - return scaleDown; - } - - /** - * Sets whether to allow the server to scaleDown its messages on server shutdown. - */ - public void setScaleDown(boolean scaleDown) - { - this.scaleDown = scaleDown; - } - - /** - * returns the name used to group - * - * @return the name of the group - */ - public String getScaleDownGroupName() - { - return scaleDownGroupName; - } - - /** - * Used to configure groups of live/backup servers. - * - * @param nodeGroupName the node group name - */ - public void setScaleDownGroupName(String nodeGroupName) - { - this.scaleDownGroupName = nodeGroupName; - } - - public String getBackupGroupName() - { - return backupGroupName; - } - - public void setBackupGroupName(String backupGroupName) - { - this.backupGroupName = backupGroupName; - } - - public List<String> getScaleDownConnectors() - { - return scaleDownConnectors; - } - - public void setScaleDownConnectors(List<String> scaleDownConnectors) - { - this.scaleDownConnectors = scaleDownConnectors; - } - - public void setScaleDownDiscoveryGroup(String scaleDownDiscoveryGroup) - { - this.scaleDownDiscoveryGroup = scaleDownDiscoveryGroup; - } - - public String getScaleDownDiscoveryGroup() - { - return scaleDownDiscoveryGroup; - } - - public boolean isRequestBackup() - { - return requestBackup; - } - - public void setRequestBackup(boolean requestBackup) - { - this.requestBackup = requestBackup; - } - - public int getBackupRequestRetries() - { - return backupRequestRetries; - } - - public void setBackupRequestRetries(int backupRequestRetries) - { - this.backupRequestRetries = backupRequestRetries; - } - - public long getBackupRequestRetryInterval() - { - return backupRequestRetryInterval; - } - - public void setBackupRequestRetryInterval(long backupRequestRetryInterval) - { - this.backupRequestRetryInterval = backupRequestRetryInterval; - } - - public int getMaxBackups() - { - return maxBackups; - } - - public void setMaxBackups(int maxBackups) - { - this.maxBackups = maxBackups; - } - - public int getBackupPortOffset() - { - return backupPortOffset; - } - - public void setBackupPortOffset(int backupPortOffset) - { - this.backupPortOffset = backupPortOffset; - } - - public List<String> getRemoteConnectors() - { - return remoteConnectors; - } - - public void setRemoteConnectors(List<String> remoteConnectors) - { - this.remoteConnectors = remoteConnectors; - } - - public boolean isCheckForLiveServer() - { - return checkForLiverServer; - } - - public void setCheckForLiveServer(boolean checkForLiverServer) - { - this.checkForLiverServer = checkForLiverServer; - } - - public boolean isAllowAutoFailBack() - { - return allowAutoFailBack; - } - - public void setAllowAutoFailBack(boolean allowAutoFailBack) - { - this.allowAutoFailBack = allowAutoFailBack; - } - - public long getFailbackDelay() - { - return failbackDelay; - } - - public void setFailbackDelay(long failbackDelay) - { - this.failbackDelay = failbackDelay; - } - - public boolean isFailoverOnServerShutdown() - { - return failoverOnServerShutdown; - } - - public void setFailoverOnServerShutdown(boolean failoverOnServerShutdown) - { - this.failoverOnServerShutdown = failoverOnServerShutdown; - } - - public void setReplicationClustername(String clusterName) - { - this.replicationClusterName = clusterName; - } - - public String getReplicationClustername() - { - return replicationClusterName; - } - - public void setScaleDownClustername(String clusterName) - { - this.scaleDownClusterName = clusterName; - } - - public String getScaleDownClustername() - { - return scaleDownClusterName; - } - - public void setMaxSavedReplicatedJournalSize(int maxSavedReplicatedJournalsSize) - { - this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; - } - - public int getMaxSavedReplicatedJournalsSize() - { - return maxSavedReplicatedJournalsSize; - } - - public boolean isRestartBackup() - { - return restartBackup; - } - - public void setRestartBackup(boolean restartBackup) - { - this.restartBackup = restartBackup; - } - - public boolean isSharedStore() - { - if (policyType == POLICY_TYPE.BACKUP_SHARED_STORE || policyType == POLICY_TYPE.SHARED_STORE || policyType == POLICY_TYPE.COLOCATED_SHARED_STORE) - return true; - else - return false; - } - - public boolean isBackup() - { - if (policyType == POLICY_TYPE.BACKUP_SHARED_STORE || policyType == POLICY_TYPE.BACKUP_REPLICATED) - return true; - else - return false; - } } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicyTemplate.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicyTemplate.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicyTemplate.java deleted file mode 100644 index cd9a09d..0000000 --- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/HAPolicyTemplate.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.core.server.cluster.ha; - - -import org.hornetq.core.config.BackupStrategy; - -/** - * a template for creating policy. this makes it easier in configuration and in embedded code to create a certain type - * of policy. for instance: - * - * <ha-policy template="COLOCATED_REPLICATED"/> - * - * or in code - * - * HAPolicy policy = HAPolicyTemplate.COLOCATED_REPLICATED.getHaPolicy() - */ -public enum HAPolicyTemplate -{ - NONE(createNonePolicy()), - REPLICATED(createReplicatedPolicy()), - SHARED_STORE(createSharedStorePolicy()), - BACKUP_REPLICATED(createBackupReplicatedPolicy()), - BACKUP_SHARED_STORE(createBackupSharedStorePolicy()), - COLOCATED_REPLICATED(createColocatedReplicatedPolicy()), - COLOCATED_SHARED_STORE(createColocatedSharedStorePolicy()); - - private final HAPolicy haPolicy; - - public HAPolicy getHaPolicy() - { - return haPolicy; - } - - HAPolicyTemplate(HAPolicy haPolicy) - { - this.haPolicy = haPolicy; - } - - private static HAPolicy createNonePolicy() - { - HAPolicy policy = new HAPolicy(); - policy.setPolicyType(HAPolicy.POLICY_TYPE.NONE); - return policy; - } - - private static HAPolicy createReplicatedPolicy() - { - HAPolicy policy = new HAPolicy(); - policy.setPolicyType(HAPolicy.POLICY_TYPE.REPLICATED); - return policy; - } - - private static HAPolicy createSharedStorePolicy() - { - HAPolicy policy = new HAPolicy(); - policy.setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - return policy; - } - - private static HAPolicy createBackupReplicatedPolicy() - { - HAPolicy policy = new HAPolicy(); - policy.setMaxBackups(0); - policy.setRequestBackup(false); - policy.setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); - policy.setBackupStrategy(BackupStrategy.FULL); - policy.setRestartBackup(true); - return policy; - } - - private static HAPolicy createBackupSharedStorePolicy() - { - HAPolicy policy = new HAPolicy(); - policy.setMaxBackups(0); - policy.setRequestBackup(false); - policy.setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - policy.setBackupStrategy(BackupStrategy.FULL); - policy.setRestartBackup(true); - return policy; - } - - private static HAPolicy createColocatedSharedStorePolicy() - { - HAPolicy policy = new HAPolicy(); - policy.setBackupPortOffset(100); - policy.setBackupRequestRetries(-1); - policy.setBackupRequestRetryInterval(5000); - policy.setMaxBackups(2); - policy.setRequestBackup(true); - policy.setPolicyType(HAPolicy.POLICY_TYPE.COLOCATED_SHARED_STORE); - policy.setBackupStrategy(BackupStrategy.SCALE_DOWN); - return policy; - } - - private static HAPolicy createColocatedReplicatedPolicy() - { - HAPolicy policy = new HAPolicy(); - policy.setBackupPortOffset(100); - policy.setBackupRequestRetries(-1); - policy.setBackupRequestRetryInterval(5000); - policy.setMaxBackups(2); - policy.setRequestBackup(true); - policy.setPolicyType(HAPolicy.POLICY_TYPE.COLOCATED_REPLICATED); - policy.setBackupStrategy(BackupStrategy.SCALE_DOWN); - return policy; - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/LiveOnlyPolicy.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/LiveOnlyPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/LiveOnlyPolicy.java new file mode 100644 index 0000000..499e1b3 --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/LiveOnlyPolicy.java @@ -0,0 +1,85 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.cluster.ha; + +import org.hornetq.core.server.impl.Activation; +import org.hornetq.core.server.impl.HornetQServerImpl; +import org.hornetq.core.server.impl.LiveOnlyActivation; + +import java.util.Map; + +public class LiveOnlyPolicy implements HAPolicy<Activation> +{ + private ScaleDownPolicy scaleDownPolicy; + + public LiveOnlyPolicy() + { + } + + public LiveOnlyPolicy(ScaleDownPolicy scaleDownPolicy) + { + this.scaleDownPolicy = scaleDownPolicy; + } + + @Override + public Activation createActivation(HornetQServerImpl server, boolean wasLive, Map<String, Object> activationParams, HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) + { + return new LiveOnlyActivation(server, this); + } + + @Override + public String getBackupGroupName() + { + return null; + } + + @Override + public String getScaleDownGroupName() + { + return scaleDownPolicy == null ? null : scaleDownPolicy.getGroupName(); + } + + @Override + public String getScaleDownClustername() + { + return null; + } + + @Override + public boolean isSharedStore() + { + return false; + } + + @Override + public boolean isBackup() + { + return false; + } + + @Override + public boolean canScaleDown() + { + return scaleDownPolicy != null; + } + + public ScaleDownPolicy getScaleDownPolicy() + { + return scaleDownPolicy; + } + + public void setScaleDownPolicy(ScaleDownPolicy scaleDownPolicy) + { + this.scaleDownPolicy = scaleDownPolicy; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicaPolicy.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicaPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicaPolicy.java new file mode 100644 index 0000000..64d591f --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicaPolicy.java @@ -0,0 +1,136 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.cluster.ha; + +import org.hornetq.api.config.HornetQDefaultConfiguration; +import org.hornetq.core.server.impl.Activation; +import org.hornetq.core.server.impl.HornetQServerImpl; +import org.hornetq.core.server.impl.SharedNothingBackupActivation; + +import java.util.Map; + +public class ReplicaPolicy extends BackupPolicy +{ + private String clusterName; + + private int maxSavedReplicatedJournalsSize = HornetQDefaultConfiguration.getDefaultMaxSavedReplicatedJournalsSize(); + + private String groupName = null; + + private boolean restartBackup = HornetQDefaultConfiguration.isDefaultRestartBackup(); + + private ReplicatedPolicy replicatedPolicy; + + public ReplicaPolicy() + { + } + + public ReplicaPolicy(String clusterName, int maxSavedReplicatedJournalsSize, String groupName, boolean restartBackup, boolean allowFailback, long failbackDelay, ScaleDownPolicy scaleDownPolicy) + { + this.clusterName = clusterName; + this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; + this.groupName = groupName; + this.restartBackup = restartBackup; + this.scaleDownPolicy = scaleDownPolicy; + //todo check default settings + replicatedPolicy = new ReplicatedPolicy(false, allowFailback, failbackDelay, groupName, clusterName, this); + } + + public ReplicaPolicy(String clusterName, int maxSavedReplicatedJournalsSize, String groupName, ReplicatedPolicy replicatedPolicy) + { + this.clusterName = clusterName; + this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; + this.groupName = groupName; + this.replicatedPolicy = replicatedPolicy; + } + + public String getClusterName() + { + return clusterName; + } + + public void setClusterName(String clusterName) + { + this.clusterName = clusterName; + } + + public int getMaxSavedReplicatedJournalsSize() + { + return maxSavedReplicatedJournalsSize; + } + + public void setMaxSavedReplicatedJournalsSize(int maxSavedReplicatedJournalsSize) + { + this.maxSavedReplicatedJournalsSize = maxSavedReplicatedJournalsSize; + } + + public ReplicatedPolicy getReplicatedPolicy() + { + return replicatedPolicy; + } + + public void setReplicatedPolicy(ReplicatedPolicy replicatedPolicy) + { + this.replicatedPolicy = replicatedPolicy; + } + + /* + * these 2 methods are the same, leaving both as the second is correct but the first is needed until more refactoring is done + * */ + public String getBackupGroupName() + { + return groupName; + } + + public String getGroupName() + { + return groupName; + } + + public void setGroupName(String groupName) + { + this.groupName = groupName; + } + + public boolean isRestartBackup() + { + return restartBackup; + } + + public void setRestartBackup(boolean restartBackup) + { + this.restartBackup = restartBackup; + } + + @Override + public boolean isSharedStore() + { + return false; + } + + @Override + public boolean canScaleDown() + { + return scaleDownPolicy != null; + } + + @Override + public Activation createActivation(HornetQServerImpl server, boolean wasLive, + Map<String, Object> activationParams, + HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception + { + SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, shutdownOnCriticalIO, this); + backupActivation.init(); + return backupActivation; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicatedPolicy.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicatedPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicatedPolicy.java new file mode 100644 index 0000000..cfb4439 --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ReplicatedPolicy.java @@ -0,0 +1,167 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.cluster.ha; + +import org.hornetq.api.config.HornetQDefaultConfiguration; +import org.hornetq.core.server.impl.HornetQServerImpl; +import org.hornetq.core.server.impl.LiveActivation; +import org.hornetq.core.server.impl.SharedNothingLiveActivation; + +import java.util.Map; + +public class ReplicatedPolicy implements HAPolicy<LiveActivation> +{ + private boolean checkForLiveServer = HornetQDefaultConfiguration.isDefaultCheckForLiveServer(); + + private String groupName = null; + + private String clusterName; + + /* + * these are only set by the ReplicaPolicy after failover to decide if the live server can failback, these should not + * be exposed in configuration. + * */ + private boolean allowAutoFailBack = HornetQDefaultConfiguration.isDefaultAllowAutoFailback(); + + private long failbackDelay = HornetQDefaultConfiguration.getDefaultFailbackDelay(); + + /* + * this is used as the policy when the server is started after a failover + * */ + private ReplicaPolicy replicaPolicy; + + + public ReplicatedPolicy() + { + } + + public ReplicatedPolicy(boolean checkForLiveServer, String groupName, String clusterName) + { + this.checkForLiveServer = checkForLiveServer; + this.groupName = groupName; + this.clusterName = clusterName; + /* + * we create this with sensible defaults in case we start after a failover + * */ + replicaPolicy = new ReplicaPolicy(clusterName, -1, groupName, this); + } + + public ReplicatedPolicy(boolean checkForLiveServer, boolean allowAutoFailBack, long failbackDelay, String groupName, String clusterName, ReplicaPolicy replicaPolicy) + { + this.checkForLiveServer = checkForLiveServer; + this.clusterName = clusterName; + this.groupName = groupName; + this.allowAutoFailBack = allowAutoFailBack; + this.failbackDelay = failbackDelay; + this.replicaPolicy = replicaPolicy; + } + + public boolean isCheckForLiveServer() + { + return checkForLiveServer; + } + + public void setCheckForLiveServer(boolean checkForLiveServer) + { + this.checkForLiveServer = checkForLiveServer; + } + + public boolean isAllowAutoFailBack() + { + return allowAutoFailBack; + } + + public long getFailbackDelay() + { + return failbackDelay; + } + + public void setFailbackDelay(long failbackDelay) + { + this.failbackDelay = failbackDelay; + } + + public String getClusterName() + { + return clusterName; + } + + public void setClusterName(String clusterName) + { + this.clusterName = clusterName; + } + + public ReplicaPolicy getReplicaPolicy() + { + return replicaPolicy; + } + + public void setReplicaPolicy(ReplicaPolicy replicaPolicy) + { + this.replicaPolicy = replicaPolicy; + } + + /* + * these 2 methods are the same, leaving both as the second is correct but the first is needed until more refactoring is done + * */ + public String getBackupGroupName() + { + return groupName; + } + + public String getGroupName() + { + return groupName; + } + + @Override + public String getScaleDownGroupName() + { + return null; + } + + public void setGroupName(String groupName) + { + this.groupName = groupName; + } + + @Override + public boolean isSharedStore() + { + return false; + } + + @Override + public boolean isBackup() + { + return false; + } + + @Override + public boolean canScaleDown() + { + return false; + } + + @Override + public String getScaleDownClustername() + { + return null; + } + + @Override + public LiveActivation createActivation(HornetQServerImpl server, boolean wasLive, Map<String, Object> activationParams, HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) + { + return new SharedNothingLiveActivation(server, this); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ScaleDownPolicy.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ScaleDownPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ScaleDownPolicy.java new file mode 100644 index 0000000..a5fee77 --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/ScaleDownPolicy.java @@ -0,0 +1,164 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.cluster.ha; + +import org.hornetq.api.core.DiscoveryGroupConfiguration; +import org.hornetq.api.core.HornetQException; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.HornetQClient; +import org.hornetq.core.client.impl.ServerLocatorInternal; +import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory; +import org.hornetq.core.server.HornetQMessageBundle; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.HornetQServerLogger; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class ScaleDownPolicy +{ + private List<String> connectors = new ArrayList<>(); + + private String discoveryGroup = null; + + private String groupName = null; + + private String clusterName; + + private boolean enabled; + + public ScaleDownPolicy() + { + } + + public ScaleDownPolicy(List<String> connectors, String groupName, String clusterName, boolean enabled) + { + this.connectors = connectors; + this.groupName = groupName; + this.clusterName = clusterName; + this.enabled = enabled; + } + + public ScaleDownPolicy(String discoveryGroup, String groupName, String clusterName, boolean enabled) + { + this.discoveryGroup = discoveryGroup; + this.groupName = groupName; + this.clusterName = clusterName; + this.enabled = enabled; + } + + + public List<String> getConnectors() + { + return connectors; + } + + public void setConnectors(List<String> connectors) + { + this.connectors = connectors; + } + + public String getDiscoveryGroup() + { + return discoveryGroup; + } + + public void setDiscoveryGroup(String discoveryGroup) + { + this.discoveryGroup = discoveryGroup; + } + + public String getGroupName() + { + return groupName; + } + + public void setGroupName(String groupName) + { + this.groupName = groupName; + } + + public String getClusterName() + { + return clusterName; + } + + public void setClusterName(String clusterName) + { + this.clusterName = clusterName; + } + + public boolean isEnabled() + { + return enabled; + } + + public void setEnabled(boolean enabled) + { + this.enabled = enabled; + } + + public static ServerLocatorInternal getScaleDownConnector(ScaleDownPolicy scaleDownPolicy, HornetQServer hornetQServer) throws HornetQException + { + if (!scaleDownPolicy.getConnectors().isEmpty()) + { + return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(connectorNameListToArray(scaleDownPolicy.getConnectors(), hornetQServer)); + } + else if (scaleDownPolicy.getDiscoveryGroup() != null) + { + DiscoveryGroupConfiguration dg = hornetQServer.getConfiguration().getDiscoveryGroupConfigurations().get(scaleDownPolicy.getDiscoveryGroup()); + + if (dg == null) + { + throw HornetQMessageBundle.BUNDLE.noDiscoveryGroupFound(dg); + } + return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg); + } + else + { + Map<String, TransportConfiguration> connectorConfigurations = hornetQServer.getConfiguration().getConnectorConfigurations(); + for (TransportConfiguration transportConfiguration : connectorConfigurations.values()) + { + if (transportConfiguration.getFactoryClassName().equals(InVMConnectorFactory.class.getName())) + { + return (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(transportConfiguration); + } + } + } + throw HornetQMessageBundle.BUNDLE.noConfigurationFoundForScaleDown(); + } + + private static TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames, HornetQServer hornetQServer) + { + TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, + connectorNames.size()); + int count = 0; + for (String connectorName : connectorNames) + { + TransportConfiguration connector = hornetQServer.getConfiguration().getConnectorConfigurations().get(connectorName); + + if (connector == null) + { + HornetQServerLogger.LOGGER.bridgeNoConnector(connectorName); + + return null; + } + + tcConfigs[count++] = connector; + } + + return tcConfigs; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreMasterPolicy.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreMasterPolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreMasterPolicy.java new file mode 100644 index 0000000..5aa848f --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreMasterPolicy.java @@ -0,0 +1,111 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.cluster.ha; + +import org.hornetq.api.config.HornetQDefaultConfiguration; +import org.hornetq.core.server.impl.HornetQServerImpl; +import org.hornetq.core.server.impl.LiveActivation; +import org.hornetq.core.server.impl.SharedStoreLiveActivation; + +import java.util.Map; + +public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> +{ + private long failbackDelay = HornetQDefaultConfiguration.getDefaultFailbackDelay(); + + private boolean failoverOnServerShutdown = HornetQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); + + private SharedStoreSlavePolicy sharedStoreSlavePolicy; + + public SharedStoreMasterPolicy() + { + } + + public SharedStoreMasterPolicy(long failbackDelay, boolean failoverOnServerShutdown) + { + this.failbackDelay = failbackDelay; + this.failoverOnServerShutdown = failoverOnServerShutdown; + } + + public long getFailbackDelay() + { + return failbackDelay; + } + + public void setFailbackDelay(long failbackDelay) + { + this.failbackDelay = failbackDelay; + } + + public boolean isFailoverOnServerShutdown() + { + return failoverOnServerShutdown; + } + + public void setFailoverOnServerShutdown(boolean failoverOnServerShutdown) + { + this.failoverOnServerShutdown = failoverOnServerShutdown; + } + + public SharedStoreSlavePolicy getSharedStoreSlavePolicy() + { + return sharedStoreSlavePolicy; + } + + public void setSharedStoreSlavePolicy(SharedStoreSlavePolicy sharedStoreSlavePolicy) + { + this.sharedStoreSlavePolicy = sharedStoreSlavePolicy; + } + + @Override + public boolean isSharedStore() + { + return true; + } + + @Override + public boolean isBackup() + { + return false; + } + + @Override + public boolean canScaleDown() + { + return false; + } + + @Override + public LiveActivation createActivation(HornetQServerImpl server, boolean wasLive, Map<String, Object> activationParams, HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) + { + return new SharedStoreLiveActivation(server, this); + } + + @Override + public String getBackupGroupName() + { + return null; + } + + @Override + public String getScaleDownGroupName() + { + return null; + } + + @Override + public String getScaleDownClustername() + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreSlavePolicy.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreSlavePolicy.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreSlavePolicy.java new file mode 100644 index 0000000..df6890f --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/SharedStoreSlavePolicy.java @@ -0,0 +1,110 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.cluster.ha; + +import org.hornetq.api.config.HornetQDefaultConfiguration; +import org.hornetq.core.server.impl.Activation; +import org.hornetq.core.server.impl.HornetQServerImpl; +import org.hornetq.core.server.impl.SharedStoreBackupActivation; + +import java.util.Map; + +public class SharedStoreSlavePolicy extends BackupPolicy +{ + private long failbackDelay = HornetQDefaultConfiguration.getDefaultFailbackDelay(); + + private boolean failoverOnServerShutdown = HornetQDefaultConfiguration.isDefaultFailoverOnServerShutdown(); + + private boolean allowAutoFailBack = HornetQDefaultConfiguration.isDefaultAllowAutoFailback(); + + //this is how we act once we have failed over + private SharedStoreMasterPolicy sharedStoreMasterPolicy; + + public SharedStoreSlavePolicy() + { + } + + public SharedStoreSlavePolicy(long failbackDelay, boolean failoverOnServerShutdown, boolean restartBackup, boolean allowAutoFailBack, ScaleDownPolicy scaleDownPolicy) + { + this.failbackDelay = failbackDelay; + this.failoverOnServerShutdown = failoverOnServerShutdown; + this.restartBackup = restartBackup; + this.allowAutoFailBack = allowAutoFailBack; + this.scaleDownPolicy = scaleDownPolicy; + sharedStoreMasterPolicy = new SharedStoreMasterPolicy(failbackDelay, failoverOnServerShutdown); + } + + public long getFailbackDelay() + { + return failbackDelay; + } + + public void setFailbackDelay(long failbackDelay) + { + this.failbackDelay = failbackDelay; + } + + public boolean isFailoverOnServerShutdown() + { + return failoverOnServerShutdown; + } + + public void setFailoverOnServerShutdown(boolean failoverOnServerShutdown) + { + this.failoverOnServerShutdown = failoverOnServerShutdown; + } + + public SharedStoreMasterPolicy getSharedStoreMasterPolicy() + { + return sharedStoreMasterPolicy; + } + + public void setSharedStoreMasterPolicy(SharedStoreMasterPolicy sharedStoreMasterPolicy) + { + this.sharedStoreMasterPolicy = sharedStoreMasterPolicy; + } + + @Override + public boolean isSharedStore() + { + return true; + } + + @Override + public boolean canScaleDown() + { + return scaleDownPolicy != null; + } + + public boolean isAllowAutoFailBack() + { + return allowAutoFailBack; + } + + public void setAllowAutoFailBack(boolean allowAutoFailBack) + { + this.allowAutoFailBack = allowAutoFailBack; + } + + @Override + public Activation createActivation(HornetQServerImpl server, boolean wasLive, Map<String, Object> activationParams, HornetQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) + { + return new SharedStoreBackupActivation(server, this); + } + + @Override + public String getBackupGroupName() + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/StandaloneHAManager.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/StandaloneHAManager.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/StandaloneHAManager.java new file mode 100644 index 0000000..52934cd --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/ha/StandaloneHAManager.java @@ -0,0 +1,56 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.cluster.ha; + +import org.hornetq.core.server.HornetQServer; + +import java.util.HashMap; +import java.util.Map; + +/* +* this implementation doesn't really do anything at the minute but this may change so Im leaving it here, Andy... +* */ +public class StandaloneHAManager implements HAManager +{ + Map<String, HornetQServer> servers = new HashMap<>(); + + boolean isStarted = false; + + @Override + public Map<String, HornetQServer> getBackupServers() + { + return servers; + } + + @Override + public void start() throws Exception + { + if (isStarted) + return; + isStarted = true; + } + + @Override + public void stop() throws Exception + { + if (!isStarted) + return; + isStarted = false; + } + + @Override + public boolean isStarted() + { + return isStarted; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java index 5e8fc75..4b49bbf 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BridgeImpl.java @@ -33,7 +33,7 @@ import org.hornetq.api.core.client.ClusterTopologyListener; import org.hornetq.api.core.client.SendAcknowledgementHandler; import org.hornetq.api.core.client.SessionFailureListener; import org.hornetq.api.core.client.TopologyMember; -import org.hornetq.api.core.management.NotificationType; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.core.client.impl.ClientSessionFactoryImpl; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.core.client.impl.ClientSessionInternal; @@ -262,7 +262,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled { TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(new SimpleString("name"), name); - Notification notification = new Notification(nodeUUID.toString(), NotificationType.BRIDGE_STARTED, props); + Notification notification = new Notification(nodeUUID.toString(), CoreNotificationType.BRIDGE_STARTED, props); notificationService.sendNotification(notification); } } @@ -388,7 +388,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled { TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(new SimpleString("name"), name); - Notification notification = new Notification(nodeUUID.toString(), NotificationType.BRIDGE_STOPPED, props); + Notification notification = new Notification(nodeUUID.toString(), CoreNotificationType.BRIDGE_STOPPED, props); try { notificationService.sendNotification(notification); @@ -413,7 +413,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled { TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(new SimpleString("name"), name); - Notification notification = new Notification(nodeUUID.toString(), NotificationType.BRIDGE_STOPPED, props); + Notification notification = new Notification(nodeUUID.toString(), CoreNotificationType.BRIDGE_STOPPED, props); try { notificationService.sendNotification(notification); @@ -672,6 +672,12 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } } + else if (scaleDownTargetNodeID != null) + { + // the disconnected node is scaling down to me, no need to reconnect to it + HornetQServerLogger.LOGGER.debug("Received scaleDownTargetNodeID: " + scaleDownTargetNodeID + "; cancelling reconnect."); + fail(true); + } else { HornetQServerLogger.LOGGER.debug("Received invalid scaleDownTargetNodeID: " + scaleDownTargetNodeID); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java index ab50cf8..28ce428 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java @@ -24,7 +24,7 @@ import org.hornetq.api.core.HornetQBuffer; import org.hornetq.api.core.HornetQBuffers; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.api.core.management.NotificationType; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.core.server.HornetQServerLogger; import org.hornetq.core.server.NodeManager; import org.hornetq.core.server.cluster.BroadcastGroup; @@ -112,7 +112,7 @@ public class BroadcastGroupImpl implements BroadcastGroup, Runnable { TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name)); - Notification notification = new Notification(nodeManager.getNodeId().toString(), NotificationType.BROADCAST_GROUP_STARTED, props); + Notification notification = new Notification(nodeManager.getNodeId().toString(), CoreNotificationType.BROADCAST_GROUP_STARTED, props); notificationService.sendNotification(notification); } @@ -146,7 +146,7 @@ public class BroadcastGroupImpl implements BroadcastGroup, Runnable { TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(new SimpleString("name"), new SimpleString(name)); - Notification notification = new Notification(nodeManager.getNodeId().toString(), NotificationType.BROADCAST_GROUP_STOPPED, props); + Notification notification = new Notification(nodeManager.getNodeId().toString(), CoreNotificationType.BROADCAST_GROUP_STOPPED, props); try { notificationService.sendNotification(notification); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java index 80545e3..5b5eed1 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java @@ -12,7 +12,9 @@ */ package org.hornetq.core.server.cluster.impl; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -24,8 +26,8 @@ import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientConsumer; import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClientProducer; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.ManagementHelper; -import org.hornetq.api.core.management.NotificationType; import org.hornetq.api.core.management.ResourceNames; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.core.client.impl.ServerLocatorInternal; @@ -38,6 +40,7 @@ import org.hornetq.core.server.Queue; import org.hornetq.core.server.ServerMessage; import org.hornetq.core.server.cluster.ClusterConnection; import org.hornetq.core.server.cluster.ClusterManager; +import org.hornetq.core.server.cluster.HornetQServerSideProtocolManagerFactory; import org.hornetq.core.server.cluster.MessageFlowRecord; import org.hornetq.core.server.cluster.Transformer; import org.hornetq.utils.UUID; @@ -50,6 +53,7 @@ import org.hornetq.utils.UUIDGenerator; * * @author tim * @author Clebert Suconic + * @author <a href="mtaylor@redhat,com">Martyn Taylor</a> */ public class ClusterConnectionBridge extends BridgeImpl { @@ -135,9 +139,9 @@ public class ClusterConnectionBridge extends BridgeImpl // we need to disable DLQ check on the clustered bridges queue.setInternalQueue(true); - if (HornetQServerLogger.LOGGER.isDebugEnabled()) + if (HornetQServerLogger.LOGGER.isTraceEnabled()) { - HornetQServerLogger.LOGGER.debug("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, + HornetQServerLogger.LOGGER.trace("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, new Exception("trace")); } } @@ -145,6 +149,7 @@ public class ClusterConnectionBridge extends BridgeImpl @Override protected ClientSessionFactoryInternal createSessionFactory() throws Exception { + serverLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) serverLocator.createSessionFactory(targetNodeID); setSessionFactory(factory); @@ -247,28 +252,26 @@ public class ClusterConnectionBridge extends BridgeImpl " AND " + ManagementHelper.HDR_NOTIFICATION_TYPE + " IN ('" + - NotificationType.BINDING_ADDED + + CoreNotificationType.BINDING_ADDED + "','" + - NotificationType.BINDING_REMOVED + + CoreNotificationType.BINDING_REMOVED + "','" + - NotificationType.CONSUMER_CREATED + + CoreNotificationType.CONSUMER_CREATED + "','" + - NotificationType.CONSUMER_CLOSED + + CoreNotificationType.CONSUMER_CLOSED + "','" + - NotificationType.PROPOSAL + + CoreNotificationType.PROPOSAL + "','" + - NotificationType.PROPOSAL_RESPONSE + + CoreNotificationType.PROPOSAL_RESPONSE + "','" + - NotificationType.UNPROPOSAL + + CoreNotificationType.UNPROPOSAL + "') AND " + ManagementHelper.HDR_DISTANCE + "<" + flowRecord.getMaxHops() + " AND (" + - ManagementHelper.HDR_ADDRESS + - " LIKE '" + - flowRecord.getAddress() + - "%')"); + createSelectorFromAddress(flowRecord.getAddress()) + + ")"); session.createTemporaryQueue(managementNotificationAddress, notifQueueName, filter); @@ -279,8 +282,10 @@ public class ClusterConnectionBridge extends BridgeImpl session.start(); ClientMessage message = session.createMessage(false); - - HornetQServerLogger.LOGGER.debug("Requesting sendQueueInfoToQueue through " + this, new Exception("trace")); + if (HornetQServerLogger.LOGGER.isTraceEnabled()) + { + HornetQServerLogger.LOGGER.trace("Requesting sendQueueInfoToQueue through " + this, new Exception("trace")); + } ManagementHelper.putOperationInvocation(message, ResourceNames.CORE_SERVER, "sendQueueInfoToQueue", @@ -298,6 +303,79 @@ public class ClusterConnectionBridge extends BridgeImpl } } + /** + * Takes in a string of an address filter or comma separated list and generates an appropriate JMS selector for + * filtering queues. + * @param address + */ + public static String createSelectorFromAddress(String address) + { + StringBuilder stringBuilder = new StringBuilder(); + + // Support standard address (not a list) case. + if (!address.contains(",")) + { + if (address.startsWith("!")) + { + stringBuilder.append(ManagementHelper.HDR_ADDRESS + " NOT LIKE '" + address.substring(1, address.length()) + "%'"); + } + else + { + stringBuilder.append(ManagementHelper.HDR_ADDRESS + " LIKE '" + address + "%'"); + } + return stringBuilder.toString(); + } + + // For comma separated lists build a JMS selector statement based on the list items + return buildSelectorFromArray(address.split(",")); + } + + public static String buildSelectorFromArray(String[] list) + { + List<String> includes = new ArrayList<String>(); + List<String> excludes = new ArrayList<String>(); + + // Split the list into addresses to match and addresses to exclude. + for (int i = 0; i < list.length; i++) + { + if (list[i].startsWith("!")) + { + excludes.add(list[i].substring(1, list[i].length())); + } + else + { + includes.add(list[i]); + } + } + + // Build the address matching part of the selector + StringBuilder builder = new StringBuilder("("); + if (includes.size() > 0) + { + if (excludes.size() > 0) builder.append("("); + for (int i = 0; i < includes.size(); i++) + { + builder.append("(" + ManagementHelper.HDR_ADDRESS + " LIKE '" + includes.get(i) + "%')"); + if (i < includes.size() - 1) builder.append(" OR "); + } + if (excludes.size() > 0) builder.append(")"); + } + + // Build the address exclusion part of the selector + if (excludes.size() > 0) + { + if (includes.size() > 0) builder.append(" AND ("); + for (int i = 0; i < excludes.size(); i++) + { + builder.append("(" + ManagementHelper.HDR_ADDRESS + " NOT LIKE '" + excludes.get(i) + "%')"); + if (i < excludes.size() - 1) builder.append(" AND "); + } + if (includes.size() > 0) builder.append(")"); + } + builder.append(")"); + return builder.toString(); + } + @Override protected void afterConnect() throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java index 60f1058..3cec130 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java @@ -33,8 +33,8 @@ import org.hornetq.api.core.TransportConfiguration; import org.hornetq.api.core.client.ClientMessage; import org.hornetq.api.core.client.ClusterTopologyListener; import org.hornetq.api.core.client.TopologyMember; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.ManagementHelper; -import org.hornetq.api.core.management.NotificationType; import org.hornetq.core.client.impl.AfterConnectInternalListener; import org.hornetq.core.client.impl.ClientSessionFactoryInternal; import org.hornetq.core.client.impl.ServerLocatorImpl; @@ -45,7 +45,6 @@ import org.hornetq.core.postoffice.Binding; import org.hornetq.core.postoffice.Bindings; import org.hornetq.core.postoffice.PostOffice; import org.hornetq.core.postoffice.impl.PostOfficeImpl; -import org.hornetq.core.protocol.ServerPacketDecoder; import org.hornetq.core.server.HornetQMessageBundle; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServerLogger; @@ -56,6 +55,7 @@ import org.hornetq.core.server.cluster.ClusterConnection; import org.hornetq.core.server.cluster.ClusterControl; import org.hornetq.core.server.cluster.ClusterManager; import org.hornetq.core.server.cluster.ClusterManager.IncomingInterceptorLookingForExceptionMessage; +import org.hornetq.core.server.cluster.HornetQServerSideProtocolManagerFactory; import org.hornetq.core.server.cluster.MessageFlowRecord; import org.hornetq.core.server.cluster.RemoteQueueBinding; import org.hornetq.core.server.group.impl.Proposal; @@ -67,9 +67,6 @@ import org.hornetq.utils.ExecutorFactory; import org.hornetq.utils.FutureLatch; import org.hornetq.utils.TypedProperties; -import static org.hornetq.api.core.management.NotificationType.CONSUMER_CLOSED; -import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED; - /** * A ClusterConnectionImpl * @@ -437,7 +434,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(new SimpleString("name"), name); Notification notification = new Notification(nodeManager.getNodeId().toString(), - NotificationType.CLUSTER_CONNECTION_STOPPED, + CoreNotificationType.CLUSTER_CONNECTION_STOPPED, props); managementService.sendNotification(notification); } @@ -652,7 +649,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn serverLocator.setAfterConnectionInternalListener(this); - serverLocator.setPacketDecoder(ServerPacketDecoder.INSTANCE); + serverLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); serverLocator.start(server.getExecutorFactory().getExecutor()); } @@ -662,7 +659,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn TypedProperties props = new TypedProperties(); props.putSimpleStringProperty(new SimpleString("name"), name); Notification notification = new Notification(nodeManager.getNodeId().toString(), - NotificationType.CLUSTER_CONNECTION_STARTED, + CoreNotificationType.CLUSTER_CONNECTION_STARTED, props); HornetQServerLogger.LOGGER.debug("sending notification: " + notification); managementService.sendNotification(notification); @@ -845,7 +842,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn targetLocator.setAfterConnectionInternalListener(this); - targetLocator.setPacketDecoder(ServerPacketDecoder.INSTANCE); + serverLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); targetLocator.setNodeID(nodeId); @@ -1133,7 +1130,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn // a list of integers SimpleString type = message.getSimpleStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE); - NotificationType ntype = NotificationType.valueOf(type.toString()); + CoreNotificationType ntype = CoreNotificationType.valueOf(type.toString()); switch (ntype) { @@ -1366,7 +1363,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn return; } - RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(), + RemoteQueueBinding binding = new RemoteQueueBindingImpl(server.getStorageManager().generateID(), queueAddress, clusterName, routingName, @@ -1503,7 +1500,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString); } - Notification notification = new Notification(null, CONSUMER_CREATED, props); + Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CREATED, props); managementService.sendNotification(notification); } @@ -1560,7 +1557,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn { props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString); } - Notification notification = new Notification(null, CONSUMER_CLOSED, props); + Notification notification = new Notification(null, CoreNotificationType.CONSUMER_CLOSED, props); managementService.sendNotification(notification); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/cluster/qourum/Quorum.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/qourum/Quorum.java b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/qourum/Quorum.java index 88441a4..e5956bd 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/cluster/qourum/Quorum.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/cluster/qourum/Quorum.java @@ -15,7 +15,7 @@ package org.hornetq.core.server.cluster.qourum; import org.hornetq.core.client.impl.Topology; /** - * A quorum can be registered with the @link QuorumManager to receive notifications about the state of a cluster. + * A quorum can be registered with the {@link QuorumManager} to receive notifications about the state of a cluster. * It can then use the {@link QuorumManager} for the quorum within a cluster to vote on a specific outcome. * */ public interface Quorum
