http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupHandlingAbstract.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupHandlingAbstract.java b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupHandlingAbstract.java index eeb5a49..83d72cb 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupHandlingAbstract.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupHandlingAbstract.java @@ -19,8 +19,8 @@ import java.util.WeakHashMap; import java.util.concurrent.Executor; import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.ManagementHelper; -import org.hornetq.api.core.management.NotificationType; import org.hornetq.core.postoffice.BindingType; import org.hornetq.core.server.HornetQServerLogger; import org.hornetq.core.server.group.GroupingHandler; @@ -110,7 +110,7 @@ public abstract class GroupHandlingAbstract implements GroupingHandler props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX); props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address); props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance); - Notification notification = new Notification(null, NotificationType.UNPROPOSAL, props); + Notification notification = new Notification(null, CoreNotificationType.UNPROPOSAL, props); try { managementService.sendNotification(notification);
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java index 5f4ce80..8851259 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/GroupingHandlerConfiguration.java @@ -14,6 +14,7 @@ package org.hornetq.core.server.group.impl; import java.io.Serializable; +import org.hornetq.api.config.HornetQDefaultConfiguration; import org.hornetq.api.core.SimpleString; /** @@ -25,75 +26,24 @@ public final class GroupingHandlerConfiguration implements Serializable { public static final long serialVersionUID = -4600283023652477326L; - public static final int DEFAULT_TIMEOUT = 5000; - - public static final long DEFAULT_GROUP_TIMEOUT = -1; - - public static final long DEFAULT_REAPER_PERIOD = 30000; - public static final String GROUP_TIMEOUT_PROP_NAME = "org.hornetq.GroupingHandlerConfiguration.groupTimeout"; public static final String REAPER_PERIOD_PROP_NAME = "org.hornetq.GroupingHandlerConfiguration.reaperPeriod"; - private final SimpleString name; - - private final TYPE type; - - private final SimpleString address; - - private final long timeout; + private SimpleString name = null; - private long groupTimeout; + private TYPE type = null; - private final long reaperPeriod; + private SimpleString address = null; + private long timeout = HornetQDefaultConfiguration.getDefaultGroupingHandlerTimeout(); - public GroupingHandlerConfiguration(final SimpleString name, final TYPE type, final SimpleString address) - { - this(name, type, address, - GroupingHandlerConfiguration.DEFAULT_TIMEOUT, - GroupingHandlerConfiguration.DEFAULT_GROUP_TIMEOUT, - GroupingHandlerConfiguration.DEFAULT_REAPER_PERIOD); - } + private long groupTimeout = HornetQDefaultConfiguration.getDefaultGroupingHandlerGroupTimeout(); - public GroupingHandlerConfiguration(final SimpleString name, - final TYPE type, - final SimpleString address, - final int timeout) - { - this(name, type, address, timeout, - GroupingHandlerConfiguration.DEFAULT_GROUP_TIMEOUT, - GroupingHandlerConfiguration.DEFAULT_REAPER_PERIOD); - } + private long reaperPeriod = HornetQDefaultConfiguration.getDefaultGroupingHandlerReaperPeriod(); - public GroupingHandlerConfiguration(final SimpleString name, - final TYPE type, - final SimpleString address, - final int timeout, - final long groupTimeout, - final long reaperPeriod) + public GroupingHandlerConfiguration() { - this.type = type; - this.name = name; - this.address = address; - this.timeout = timeout; - if (System.getProperty(GROUP_TIMEOUT_PROP_NAME) != null) - { - this.groupTimeout = Long.parseLong(System.getProperty(GROUP_TIMEOUT_PROP_NAME)); - } - else - { - this.groupTimeout = groupTimeout; - } - - if (System.getProperty(REAPER_PERIOD_PROP_NAME) != null) - { - this.reaperPeriod = Long.parseLong(System.getProperty(REAPER_PERIOD_PROP_NAME)); - } - else - { - this.reaperPeriod = reaperPeriod; - } } public SimpleString getName() @@ -126,6 +76,42 @@ public final class GroupingHandlerConfiguration implements Serializable return reaperPeriod; } + public GroupingHandlerConfiguration setName(SimpleString name) + { + this.name = name; + return this; + } + + public GroupingHandlerConfiguration setType(TYPE type) + { + this.type = type; + return this; + } + + public GroupingHandlerConfiguration setAddress(SimpleString address) + { + this.address = address; + return this; + } + + public GroupingHandlerConfiguration setTimeout(long timeout) + { + this.timeout = timeout; + return this; + } + + public GroupingHandlerConfiguration setGroupTimeout(long groupTimeout) + { + this.groupTimeout = groupTimeout; + return this; + } + + public GroupingHandlerConfiguration setReaperPeriod(long reaperPeriod) + { + this.reaperPeriod = reaperPeriod; + return this; + } + public enum TYPE { LOCAL("LOCAL"), REMOTE("REMOTE"); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/LocalGroupingHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/LocalGroupingHandler.java b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/LocalGroupingHandler.java index a817f70..7255c9f 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/LocalGroupingHandler.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/LocalGroupingHandler.java @@ -25,8 +25,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.ManagementHelper; -import org.hornetq.api.core.management.NotificationType; import org.hornetq.core.persistence.OperationContext; import org.hornetq.core.persistence.StorageManager; import org.hornetq.core.postoffice.BindingType; @@ -140,7 +140,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { addRecord = true; groupBinding = new GroupBinding(proposal.getGroupId(), proposal.getClusterName()); - groupBinding.setId(storageManager.generateUniqueID()); + groupBinding.setId(storageManager.generateID()); List<GroupBinding> newList = new ArrayList<GroupBinding>(); List<GroupBinding> oldList = groupMap.putIfAbsent(groupBinding.getClusterName(), newList); if (oldList != null) @@ -193,7 +193,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX); props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address); props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance); - Notification notification = new Notification(null, NotificationType.PROPOSAL_RESPONSE, props); + Notification notification = new Notification(null, CoreNotificationType.PROPOSAL_RESPONSE, props); managementService.sendNotification(notification); } @@ -243,7 +243,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract groupBindings.remove(groupBinding); try { - long tx = storageManager.generateUniqueID(); + long tx = storageManager.generateID(); storageManager.deleteGrouping(tx, groupBinding); storageManager.commitBindings(tx); } @@ -265,7 +265,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { waitingForBindings = true; - //make a copy of the bindings added so far from the cluster via onNotification(). + //make a copy of the bindings added so far from the cluster via onNotification() List<SimpleString> bindingsAlreadyAdded; if (expectedBindings == null) { @@ -307,13 +307,15 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract public void onNotification(final Notification notification) { - if (notification.getType() == NotificationType.BINDING_REMOVED) + if (!(notification.getType() instanceof CoreNotificationType)) return; + + if (notification.getType() == CoreNotificationType.BINDING_REMOVED) { SimpleString clusterName = notification.getProperties() .getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME); removeGrouping(clusterName); } - else if (notification.getType() == NotificationType.BINDING_ADDED) + else if (notification.getType() == CoreNotificationType.BINDING_ADDED) { SimpleString clusterName = notification.getProperties() .getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME); @@ -426,7 +428,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { if (txID < 0) { - txID = storageManager.generateUniqueID(); + txID = storageManager.generateID(); } storageManager.deleteGrouping(txID, val); } @@ -496,7 +498,7 @@ public final class LocalGroupingHandler extends GroupHandlingAbstract { if (txID < 0) { - txID = storageManager.generateUniqueID(); + txID = storageManager.generateID(); } storageManager.deleteGrouping(txID, groupBinding); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java index 1187c33..ddbef27 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java @@ -23,8 +23,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.management.CoreNotificationType; import org.hornetq.api.core.management.ManagementHelper; -import org.hornetq.api.core.management.NotificationType; import org.hornetq.core.postoffice.BindingType; import org.hornetq.core.server.HornetQMessageBundle; import org.hornetq.core.server.HornetQServerLogger; @@ -229,7 +229,7 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0); - return new Notification(null, NotificationType.PROPOSAL, props); + return new Notification(null, CoreNotificationType.PROPOSAL, props); } public Response getProposal(final SimpleString fullID, boolean touchTime) @@ -294,7 +294,7 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, BindingType.LOCAL_QUEUE_INDEX); props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address); props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance); - Notification notification = new Notification(null, NotificationType.PROPOSAL, props); + Notification notification = new Notification(null, CoreNotificationType.PROPOSAL, props); managementService.sendNotification(notification); return null; } @@ -311,8 +311,9 @@ public final class RemoteGroupingHandler extends GroupHandlingAbstract public void onNotification(final Notification notification) { + if (!(notification.getType() instanceof CoreNotificationType)) return; // removing the groupid if the binding has been removed - if (notification.getType() == NotificationType.BINDING_REMOVED) + if (notification.getType() == CoreNotificationType.BINDING_REMOVED) { SimpleString clusterName = notification.getProperties() .getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/Activation.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/Activation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/Activation.java new file mode 100644 index 0000000..dc39d7a --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/Activation.java @@ -0,0 +1,118 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.impl; + +import org.hornetq.api.core.HornetQException; +import org.hornetq.core.config.Configuration; +import org.hornetq.core.paging.PagingManager; +import org.hornetq.core.persistence.StorageManager; +import org.hornetq.core.postoffice.PostOffice; +import org.hornetq.core.protocol.core.Channel; +import org.hornetq.core.protocol.core.ChannelHandler; +import org.hornetq.core.remoting.server.RemotingService; +import org.hornetq.core.replication.ReplicationManager; +import org.hornetq.core.server.HornetQServer; +import org.hornetq.core.server.NodeManager; +import org.hornetq.core.server.QueueFactory; +import org.hornetq.core.server.cluster.ha.HAManager; +import org.hornetq.core.server.cluster.ha.StandaloneHAManager; +import org.hornetq.core.server.group.GroupingHandler; +import org.hornetq.core.server.management.ManagementService; +import org.hornetq.spi.core.remoting.Acceptor; + +/** +* An activation controls the lifecycle of the server and any components specific to the Activation itself. +*/ +public abstract class Activation implements Runnable +{ + public abstract void close(boolean permanently, boolean restarting) throws Exception; + + /* + * freeze the connection but allow the Activation to over ride this and decide if any connections should be left open. + * */ + public void freezeConnections(RemotingService remotingService) + { + if (remotingService != null) + { + remotingService.freeze(null, null); + } + } + + /* + * allow the activation t ooverride this if it needs to tidy up after freezing the connection. its a different method as + * its called outside of the lock that the previous method is. + * */ + public void postConnectionFreeze() + { + } + + /* + * called before the server is closing the journals so the activation can tidy up stuff + * */ + public void preStorageClose() throws Exception + { + } + + /* + * called by the server to notify the Activation that the server is stopping + * */ + public void sendLiveIsStopping() + { + } + + /* + * called by the ha manager to notify the Activation that HA is now active + * */ + public void haStarted() + { + } + + /* + * allows the Activation to register a channel handler so it can handle any packets that are unique to the Activation + * */ + public ChannelHandler getActivationChannelHandler(Channel channel, Acceptor acceptorUsed) + { + return null; + } + + /* + * returns the HA manager used for this Activation + * */ + public HAManager getHAManager() + { + return new StandaloneHAManager(); + } + + /* + * create the Journal loader needed for this Activation. + * */ + public JournalLoader createJournalLoader(PostOffice postOffice, PagingManager pagingManager, StorageManager storageManager, QueueFactory queueFactory, NodeManager nodeManager, ManagementService managementService, GroupingHandler groupingHandler, Configuration configuration, HornetQServer parentServer) throws HornetQException + { + return new PostOfficeJournalLoader(postOffice, + pagingManager, + storageManager, + queueFactory, + nodeManager, + managementService, + groupingHandler, + configuration); + } + + /* + * todo, remove this, its only needed for JMSServerManagerImpl, it should be sought elsewhere + * */ + public ReplicationManager getReplicationManager() + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/BackupRecoveryJournalLoader.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/BackupRecoveryJournalLoader.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/BackupRecoveryJournalLoader.java index 24a4816..021b1dc 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/BackupRecoveryJournalLoader.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/BackupRecoveryJournalLoader.java @@ -16,7 +16,6 @@ import org.hornetq.api.core.Pair; import org.hornetq.api.core.SimpleString; import org.hornetq.api.core.client.ClientSessionFactory; import org.hornetq.api.core.client.ServerLocator; -import org.hornetq.core.client.impl.ServerLocatorImpl; import org.hornetq.core.client.impl.ServerLocatorInternal; import org.hornetq.core.config.Configuration; import org.hornetq.core.journal.Journal; @@ -24,12 +23,12 @@ import org.hornetq.core.paging.PagingManager; import org.hornetq.core.persistence.GroupingInfo; import org.hornetq.core.persistence.StorageManager; import org.hornetq.core.postoffice.PostOffice; -import org.hornetq.core.protocol.ServerPacketDecoder; import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServerLogger; import org.hornetq.core.server.NodeManager; import org.hornetq.core.server.QueueFactory; import org.hornetq.core.server.cluster.ClusterController; +import org.hornetq.core.server.cluster.HornetQServerSideProtocolManagerFactory; import org.hornetq.core.server.group.GroupingHandler; import org.hornetq.core.server.management.ManagementService; import org.hornetq.core.transaction.ResourceManager; @@ -88,7 +87,8 @@ public class BackupRecoveryJournalLoader extends PostOfficeJournalLoader public void postLoad(Journal messageJournal, ResourceManager resourceManager, Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception { ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterController); - ((ServerLocatorImpl)locator).setPacketDecoder(ServerPacketDecoder.INSTANCE); + locator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); + try (ClientSessionFactory sessionFactory = locator.createSessionFactory()) { scaleDownHandler.scaleDown(sessionFactory, resourceManager, duplicateIDMap, parentServer.getConfiguration().getManagementAddress(), parentServer.getNodeID()); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ColocatedActivation.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ColocatedActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ColocatedActivation.java new file mode 100644 index 0000000..92412a3 --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ColocatedActivation.java @@ -0,0 +1,331 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.impl; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.Pair; +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.client.impl.Topology; +import org.hornetq.core.client.impl.TopologyMemberImpl; +import org.hornetq.core.protocol.core.Channel; +import org.hornetq.core.protocol.core.ChannelHandler; +import org.hornetq.core.protocol.core.Packet; +import org.hornetq.core.protocol.core.impl.PacketImpl; +import org.hornetq.core.protocol.core.impl.wireformat.BackupRequestMessage; +import org.hornetq.core.protocol.core.impl.wireformat.BackupResponseMessage; +import org.hornetq.core.remoting.server.RemotingService; +import org.hornetq.core.replication.ReplicationManager; +import org.hornetq.core.server.cluster.ha.ColocatedHAManager; +import org.hornetq.core.server.cluster.ha.ColocatedPolicy; +import org.hornetq.core.server.cluster.ha.HAManager; +import org.hornetq.core.server.cluster.qourum.QuorumVote; +import org.hornetq.core.server.cluster.qourum.QuorumVoteHandler; +import org.hornetq.core.server.cluster.qourum.Vote; +import org.hornetq.spi.core.remoting.Acceptor; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class ColocatedActivation extends LiveActivation +{ + private static final SimpleString REQUEST_BACKUP_QUORUM_VOTE = new SimpleString("RequestBackupQuorumVote"); + + private final ColocatedHAManager colocatedHAManager; + + private final ColocatedPolicy colocatedPolicy; + + LiveActivation liveActivation; + + private final HornetQServerImpl server; + + public ColocatedActivation(HornetQServerImpl hornetQServer, ColocatedPolicy colocatedPolicy, LiveActivation liveActivation) + { + server = hornetQServer; + this.colocatedPolicy = colocatedPolicy; + this.liveActivation = liveActivation; + colocatedHAManager = new ColocatedHAManager(colocatedPolicy, server); + } + + + @Override + public void haStarted() + { + server.getClusterManager().getQuorumManager().registerQuorumHandler(new RequestBackupQuorumVoteHandler()); + //vote for a backup if required + if (colocatedPolicy.isRequestBackup()) + { + server.getClusterManager().getQuorumManager().vote(new RequestBackupQuorumVote()); + } + } + + @Override + public void freezeConnections(RemotingService remotingService) + { + liveActivation.freezeConnections(remotingService); + } + + @Override + public void postConnectionFreeze() + { + liveActivation.postConnectionFreeze(); + } + + @Override + public void preStorageClose() throws Exception + { + liveActivation.preStorageClose(); + } + + @Override + public void sendLiveIsStopping() + { + liveActivation.sendLiveIsStopping(); + } + + @Override + public ReplicationManager getReplicationManager() + { + return liveActivation.getReplicationManager(); + } + + @Override + public HAManager getHAManager() + { + return colocatedHAManager; + } + + @Override + public void run() + { + liveActivation.run(); + } + + @Override + public void close(boolean permanently, boolean restarting) throws Exception + { + liveActivation.close(permanently, restarting); + } + + @Override + public ChannelHandler getActivationChannelHandler(final Channel channel, final Acceptor acceptorUsed) + { + final ChannelHandler activationChannelHandler = liveActivation.getActivationChannelHandler(channel, acceptorUsed); + return new ChannelHandler() + { + @Override + public void handlePacket(Packet packet) + { + if (packet.getType() == PacketImpl.BACKUP_REQUEST) + { + BackupRequestMessage backupRequestMessage = (BackupRequestMessage) packet; + boolean started = false; + try + { + started = colocatedHAManager.activateBackup(backupRequestMessage.getBackupSize(), + backupRequestMessage.getJournalDirectory(), + backupRequestMessage.getBindingsDirectory(), + backupRequestMessage.getLargeMessagesDirectory(), + backupRequestMessage.getPagingDirectory(), + backupRequestMessage.getNodeID()); + } + catch (Exception e) + { + e.printStackTrace(); + } + channel.send(new BackupResponseMessage(started)); + } + else if (activationChannelHandler != null) + { + activationChannelHandler.handlePacket(packet); + } + } + }; + } + + /** + * A vote handler for incoming backup request votes + */ + private final class RequestBackupQuorumVoteHandler implements QuorumVoteHandler + { + @Override + public Vote vote(Vote vote) + { + int size = colocatedHAManager.getBackupServers().size(); + return new RequestBackupVote(size, server.getNodeID().toString(), size < colocatedPolicy.getMaxBackups()); + } + + @Override + public SimpleString getQuorumName() + { + return REQUEST_BACKUP_QUORUM_VOTE; + } + + @Override + public Vote decode(HornetQBuffer voteBuffer) + { + RequestBackupVote requestBackupVote = new RequestBackupVote(); + requestBackupVote.decode(voteBuffer); + return requestBackupVote; + } + } + /** + * a quorum vote for backup requests + */ + private final class RequestBackupQuorumVote extends QuorumVote<RequestBackupVote, Pair<String, Integer>> + { + //the available nodes that we can request + private final List<Pair<String, Integer>> nodes = new ArrayList<>(); + + public RequestBackupQuorumVote() + { + super(REQUEST_BACKUP_QUORUM_VOTE); + } + + @Override + public Vote connected() + { + return new RequestBackupVote(); + } + + @Override + public Vote notConnected() + { + return new RequestBackupVote(); + } + + @Override + public void vote(RequestBackupVote vote) + { + //if the returned vote is available add it to the nodes we can request + if (vote.backupAvailable) + { + nodes.add(vote.getVote()); + } + } + + @Override + public Pair<String, Integer> getDecision() + { + //sort the nodes by how many backups they have and choose the first + Collections.sort(nodes, new Comparator<Pair<String, Integer>>() + { + @Override + public int compare(Pair<String, Integer> o1, Pair<String, Integer> o2) + { + return o1.getB().compareTo(o2.getB()); + } + }); + return nodes.get(0); + } + + @Override + public void allVotesCast(Topology voteTopology) + { + //if we have any nodes that we can request then send a request + if (nodes.size() > 0) + { + Pair<String, Integer> decision = getDecision(); + TopologyMemberImpl member = voteTopology.getMember(decision.getA()); + try + { + boolean backupStarted = colocatedHAManager.requestBackup(member.getConnector(), decision.getB().intValue(), !colocatedPolicy.isSharedStore()); + if (!backupStarted) + { + nodes.clear(); + server.getScheduledPool().schedule(new Runnable() + { + @Override + public void run() + { + server.getClusterManager().getQuorumManager().vote(new RequestBackupQuorumVote()); + } + }, colocatedPolicy.getBackupRequestRetryInterval(), TimeUnit.MILLISECONDS); + } + } + catch (Exception e) + { + e.printStackTrace(); + //todo + } + } + else + { + nodes.clear(); + server.getScheduledPool().schedule(new Runnable() + { + @Override + public void run() + { + server.getClusterManager().getQuorumManager().vote(RequestBackupQuorumVote.this); + } + }, colocatedPolicy.getBackupRequestRetryInterval(), TimeUnit.MILLISECONDS); + } + } + + @Override + public SimpleString getName() + { + return REQUEST_BACKUP_QUORUM_VOTE; + } + } + + class RequestBackupVote extends Vote<Pair<String, Integer>> + { + private int backupsSize; + private String nodeID; + private boolean backupAvailable; + + public RequestBackupVote() + { + backupsSize = -1; + } + + public RequestBackupVote(int backupsSize, String nodeID, boolean backupAvailable) + { + this.backupsSize = backupsSize; + this.nodeID = nodeID; + this.backupAvailable = backupAvailable; + } + + @Override + public void encode(HornetQBuffer buff) + { + buff.writeInt(backupsSize); + buff.writeNullableString(nodeID); + buff.writeBoolean(backupAvailable); + } + + @Override + public void decode(HornetQBuffer buff) + { + backupsSize = buff.readInt(); + nodeID = buff.readNullableString(); + backupAvailable = buff.readBoolean(); + } + + @Override + public boolean isRequestServerVote() + { + return true; + } + + @Override + public Pair<String, Integer> getVote() + { + return new Pair<>(nodeID, backupsSize); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/ConnectorsService.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ConnectorsService.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ConnectorsService.java index 094efc9..2cc6f2d 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/ConnectorsService.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/ConnectorsService.java @@ -12,11 +12,13 @@ */ package org.hornetq.core.server.impl; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; +import org.hornetq.api.core.Pair; import org.hornetq.core.config.Configuration; import org.hornetq.core.config.ConnectorServiceConfiguration; import org.hornetq.core.persistence.StorageManager; @@ -51,48 +53,36 @@ public final class ConnectorsService implements HornetQComponent private final Set<ConnectorService> connectors = new HashSet<ConnectorService>(); + private final ServiceRegistry serviceRegistry; + public ConnectorsService(final Configuration configuration, final StorageManager storageManager, final ScheduledExecutorService scheduledPool, - final PostOffice postOffice) + final PostOffice postOffice, + final ServiceRegistry serviceRegistry) { this.configuration = configuration; this.storageManager = storageManager; this.scheduledPool = scheduledPool; this.postOffice = postOffice; + this.serviceRegistry = serviceRegistry; } public void start() throws Exception { List<ConnectorServiceConfiguration> configurationList = configuration.getConnectorServiceConfigurations(); - for (ConnectorServiceConfiguration info : configurationList) - { - ConnectorServiceFactory factory = (ConnectorServiceFactory)ClassloadingUtil.newInstanceFromClassLoader(info.getFactoryClassName()); - - if (info.getParams() != null) - { - Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), info.getParams() - .keySet()); - - if (!invalid.isEmpty()) - { - HornetQServerLogger.LOGGER.connectorKeysInvalid(ConfigurationHelper.stringSetToCommaListString(invalid)); - - continue; - } - } - Set<String> invalid = ConfigurationHelper.checkKeysExist(factory.getRequiredProperties(), info.getParams() - .keySet()); + Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServiceFactories = serviceRegistry.getConnectorServices(); - if (!invalid.isEmpty()) - { - HornetQServerLogger.LOGGER.connectorKeysMissing(ConfigurationHelper.stringSetToCommaListString(invalid)); + for (Pair<ConnectorServiceFactory, ConnectorServiceConfiguration> pair : connectorServiceFactories) + { + createService(pair.getB(), pair.getA()); + } - continue; - } - ConnectorService connectorService = factory.createConnectorService(info.getConnectorName(), info.getParams(), storageManager, postOffice, scheduledPool); - connectors.add(connectorService); + for (ConnectorServiceConfiguration info : configurationList) + { + ConnectorServiceFactory factory = (ConnectorServiceFactory) ClassloadingUtil.newInstanceFromClassLoader(info.getFactoryClassName()); + createService(info, factory); } for (ConnectorService connector : connectors) @@ -109,6 +99,28 @@ public final class ConnectorsService implements HornetQComponent isStarted = true; } + public void createService(ConnectorServiceConfiguration info, ConnectorServiceFactory factory) + { + if (info.getParams() != null) + { + Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), info.getParams().keySet()); + if (!invalid.isEmpty()) + { + HornetQServerLogger.LOGGER.connectorKeysInvalid(ConfigurationHelper.stringSetToCommaListString(invalid)); + return; + } + } + + Set<String> invalid = ConfigurationHelper.checkKeysExist(factory.getRequiredProperties(), info.getParams().keySet()); + if (!invalid.isEmpty()) + { + HornetQServerLogger.LOGGER.connectorKeysMissing(ConfigurationHelper.stringSetToCommaListString(invalid)); + return; + } + ConnectorService connectorService = factory.createConnectorService(info.getConnectorName(), info.getParams(), storageManager, postOffice, scheduledPool); + connectors.add(connectorService); + } + public void stop() throws Exception { if (!isStarted) http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/DivertImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/DivertImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/DivertImpl.java index 627f1c1..4e63d14 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/DivertImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/DivertImpl.java @@ -85,7 +85,7 @@ public class DivertImpl implements Divert HornetQServerLogger.LOGGER.trace("Diverting message " + message + " into " + this); } - long id = storageManager.generateUniqueID(); + long id = storageManager.generateID(); ServerMessage copy = null; @@ -100,6 +100,8 @@ public class DivertImpl implements Divert copy.setAddress(forwardAddress); + copy.setExpiration(message.getExpiration()); + if (transformer != null) { copy = transformer.transform(copy);
