http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java index baca41d..ad2538e 100644 --- a/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java @@ -13,13 +13,12 @@ package org.hornetq.core.server.impl; import javax.management.MBeanServer; + import java.io.File; import java.io.FilenameFilter; import java.io.PrintWriter; import java.io.StringWriter; import java.lang.management.ManagementFactory; -import java.lang.reflect.Array; -import java.nio.channels.ClosedChannelException; import java.security.AccessController; import java.security.PrivilegedAction; import java.util.ArrayList; @@ -31,8 +30,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -41,24 +38,11 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.hornetq.api.config.HornetQDefaultConfiguration; -import org.hornetq.api.core.DiscoveryGroupConfiguration; -import org.hornetq.api.core.HornetQAlreadyReplicatingException; -import org.hornetq.api.core.HornetQException; -import org.hornetq.api.core.HornetQIllegalStateException; -import org.hornetq.api.core.HornetQInternalErrorException; import org.hornetq.api.core.Pair; import org.hornetq.api.core.SimpleString; -import org.hornetq.api.core.TransportConfiguration; -import org.hornetq.api.core.client.ClusterTopologyListener; -import org.hornetq.api.core.client.HornetQClient; -import org.hornetq.api.core.client.TopologyMember; import org.hornetq.core.asyncio.impl.AsynchronousFileImpl; import org.hornetq.core.client.impl.ClientSessionFactoryImpl; -import org.hornetq.core.client.impl.ClientSessionFactoryInternal; -import org.hornetq.core.client.impl.ServerLocatorInternal; -import org.hornetq.core.config.BackupStrategy; import org.hornetq.core.config.BridgeConfiguration; -import org.hornetq.core.config.ClusterConnectionConfiguration; import org.hornetq.core.config.Configuration; import org.hornetq.core.config.ConfigurationUtils; import org.hornetq.core.config.CoreQueueConfiguration; @@ -94,29 +78,19 @@ import org.hornetq.core.persistence.impl.journal.JournalStorageManager; import org.hornetq.core.persistence.impl.journal.OperationContextImpl; import org.hornetq.core.persistence.impl.nullpm.NullStorageManager; import org.hornetq.core.postoffice.Binding; -import org.hornetq.core.postoffice.DuplicateIDCache; import org.hornetq.core.postoffice.PostOffice; import org.hornetq.core.postoffice.QueueBinding; import org.hornetq.core.postoffice.impl.DivertBinding; import org.hornetq.core.postoffice.impl.LocalQueueBinding; import org.hornetq.core.postoffice.impl.PostOfficeImpl; -import org.hornetq.core.protocol.ServerPacketDecoder; -import org.hornetq.core.protocol.core.Channel; -import org.hornetq.core.protocol.core.CoreRemotingConnection; -import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage; -import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping; -import org.hornetq.core.remoting.CloseListener; -import org.hornetq.core.remoting.FailureListener; import org.hornetq.core.remoting.server.RemotingService; import org.hornetq.core.remoting.server.impl.RemotingServiceImpl; -import org.hornetq.core.replication.ReplicationEndpoint; import org.hornetq.core.replication.ReplicationManager; import org.hornetq.core.security.CheckType; import org.hornetq.core.security.Role; import org.hornetq.core.security.SecurityStore; import org.hornetq.core.security.impl.SecurityStoreImpl; import org.hornetq.core.server.ActivateCallback; -import org.hornetq.core.server.ActivationParams; import org.hornetq.core.server.Bindable; import org.hornetq.core.server.Divert; import org.hornetq.core.server.HornetQComponent; @@ -125,25 +99,20 @@ import org.hornetq.core.server.HornetQServer; import org.hornetq.core.server.HornetQServerLogger; import org.hornetq.core.server.JournalType; import org.hornetq.core.server.LargeServerMessage; -import org.hornetq.core.server.LiveNodeLocator; import org.hornetq.core.server.MemoryManager; import org.hornetq.core.server.NodeManager; import org.hornetq.core.server.Queue; import org.hornetq.core.server.QueueFactory; import org.hornetq.core.server.ServerSession; +import org.hornetq.core.server.ServerSessionFactory; import org.hornetq.core.server.cluster.BackupManager; -import org.hornetq.core.server.cluster.ClusterConnection; -import org.hornetq.core.server.cluster.ClusterControl; -import org.hornetq.core.server.cluster.ClusterController; import org.hornetq.core.server.cluster.ClusterManager; import org.hornetq.core.server.cluster.Transformer; import org.hornetq.core.server.cluster.ha.HAPolicy; -import org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.hornetq.core.server.group.GroupingHandler; import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration; import org.hornetq.core.server.group.impl.LocalGroupingHandler; import org.hornetq.core.server.group.impl.RemoteGroupingHandler; -import org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION; import org.hornetq.core.server.management.ManagementService; import org.hornetq.core.server.management.impl.ManagementServiceImpl; import org.hornetq.core.settings.HierarchicalRepository; @@ -165,15 +134,12 @@ import org.hornetq.utils.ReusableLatch; import org.hornetq.utils.SecurityFormatter; import org.hornetq.utils.VersionLoader; -import static org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAILURE_REPLICATING; -import static org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.FAIL_OVER; -import static org.hornetq.core.server.cluster.qourum.SharedNothingBackupQuorum.BACKUP_ACTIVATION.STOP; - /** * The HornetQ server implementation * * @author <a href="mailto:[email protected]">Tim Fox</a> * @author <a href="mailto:[email protected]>Andy Taylor</a> + * @author <a href="mailto:[email protected]>Martyn Taylor</a> */ public class HornetQServerImpl implements HornetQServer { @@ -187,6 +153,8 @@ public class HornetQServerImpl implements HornetQServer */ public static final String GENERIC_IGNORED_FILTER = "__HQX=-1"; + private HAPolicy haPolicy; + enum SERVER_STATE { /** @@ -276,20 +244,6 @@ public class HornetQServerImpl implements HornetQServer */ private final ReusableLatch activationLatch = new ReusableLatch(0); - private final ReusableLatch backupSyncLatch = new ReusableLatch(0); - - private final Object replicationLock = new Object(); - - /** - * Only applicable to 'remote backup servers'. If this flag is false the backup may not become - * 'live'. - */ - private volatile boolean backupUpToDate = true; - - private ReplicationManager replicationManager; - - private ReplicationEndpoint replicationEndpoint; - private final Set<ActivateCallback> activateCallbacks = new ConcurrentHashSet<ActivateCallback>(); private volatile GroupingHandler groupingHandler; @@ -307,16 +261,16 @@ public class HornetQServerImpl implements HornetQServer private final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener(); - private final Object failbackCheckerGuard = new Object(); - private boolean cancelFailBackChecker; - private final HornetQServer parentServer; - private ClientSessionFactoryInternal scaleDownClientSessionFactory = null; + //todo think about moving this to the activation + private final List<SimpleString> scaledDownNodeIDs = new ArrayList<>(); + + private boolean threadPoolSupplied = false; - private ServerLocatorInternal scaleDownServerLocator = null; + private boolean scheduledPoolSupplied = false; - private final List<SimpleString> scaledDownNodeIDs = new ArrayList<>(); + private ServiceRegistry serviceRegistry; // Constructors // --------------------------------------------------------------------------------- @@ -358,11 +312,19 @@ public class HornetQServerImpl implements HornetQServer final HornetQSecurityManager securityManager, final HornetQServer parentServer) { + this(configuration, mbeanServer, securityManager, parentServer, null); + } + + public HornetQServerImpl(Configuration configuration, + MBeanServer mbeanServer, + final HornetQSecurityManager securityManager, + final HornetQServer parentServer, + final ServiceRegistry serviceRegistry) + { if (configuration == null) { configuration = new ConfigurationImpl(); } - if (mbeanServer == null) { // Just use JVM mbean server @@ -389,6 +351,7 @@ public class HornetQServerImpl implements HornetQServer this.parentServer = parentServer; + this.serviceRegistry = serviceRegistry == null ? new ServiceRegistry() : serviceRegistry; } // life-cycle methods @@ -397,7 +360,7 @@ public class HornetQServerImpl implements HornetQServer /* * Can be overridden for tests */ - protected NodeManager createNodeManager(final String directory, String nodeGroupName, boolean replicatingBackup) + protected NodeManager createNodeManager(final String directory, boolean replicatingBackup) { NodeManager manager; if (!configuration.isPersistenceEnabled()) @@ -412,7 +375,6 @@ public class HornetQServerImpl implements HornetQServer { manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout()); } - manager.setNodeGroupName(nodeGroupName); return manager; } @@ -423,11 +385,13 @@ public class HornetQServerImpl implements HornetQServer HornetQServerLogger.LOGGER.debug("Server already started!"); return; } - synchronized (failbackCheckerGuard) + + state = SERVER_STATE.STARTING; + + if (haPolicy == null) { - cancelFailBackChecker = false; + haPolicy = ConfigurationUtils.getHAPolicy(configuration.getHAPolicyConfiguration()); } - state = SERVER_STATE.STARTING; activationLatch.setCount(1); @@ -439,12 +403,11 @@ public class HornetQServerImpl implements HornetQServer { checkJournalDirectory(); - nodeManager = - createNodeManager(configuration.getJournalDirectory(), configuration.getHAPolicy().getBackupGroupName(), false); + nodeManager = createNodeManager(configuration.getJournalDirectory(), false); nodeManager.start(); - HornetQServerLogger.LOGGER.serverStarting((configuration.getHAPolicy().isBackup() ? "backup" : "live"), configuration); + HornetQServerLogger.LOGGER.serverStarting((haPolicy.isBackup() ? "backup" : "live"), configuration); if (configuration.isRunSyncSpeedTest()) { @@ -453,38 +416,24 @@ public class HornetQServerImpl implements HornetQServer test.run(); } - final boolean wasLive = !configuration.getHAPolicy().isBackup(); - if (!configuration.getHAPolicy().isBackup()) + final boolean wasLive = !haPolicy.isBackup(); + if (!haPolicy.isBackup()) { - if (configuration.getHAPolicy().isSharedStore() && configuration.isPersistenceEnabled()) - { - activation = new SharedStoreLiveActivation(); - } - else - { - activation = new SharedNothingLiveActivation(); - } + activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO); activation.run(); } // The activation on fail-back may change the value of isBackup, for that reason we are // checking again here - if (configuration.getHAPolicy().isBackup()) + if (haPolicy.isBackup()) { - if (configuration.getHAPolicy().isSharedStore()) + if (haPolicy.isSharedStore()) { - activation = new SharedStoreBackupActivation(); + activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO); } else { - assert replicationEndpoint == null; - nodeManager.stop(); - nodeManager = - createNodeManager(configuration.getJournalDirectory(), configuration.getHAPolicy().getBackupGroupName(), true); - backupUpToDate = false; - backupSyncLatch.setCount(1); - replicationEndpoint = new ReplicationEndpoint(this, shutdownOnCriticalIO, wasLive); - activation = new SharedNothingBackupActivation(wasLive); + activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO); } backupActivationThread = new Thread(activation, HornetQMessageBundle.BUNDLE.activationForServer(this)); @@ -497,7 +446,7 @@ public class HornetQServerImpl implements HornetQServer identity != null ? identity : ""); } // start connector service - connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice); + connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice, serviceRegistry); connectorsService.start(); } finally @@ -520,6 +469,74 @@ public class HornetQServerImpl implements HornetQServer super.finalize(); } + public void setState(SERVER_STATE state) + { + this.state = state; + } + + public SERVER_STATE getState() + { + return state; + } + + public void interrupBackupThread(NodeManager nodeManagerInUse) throws InterruptedException + { + long timeout = 30000; + + long start = System.currentTimeMillis(); + + while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout) + { + if (nodeManagerInUse != null) + { + nodeManagerInUse.interrupt(); + } + + backupActivationThread.interrupt(); + + backupActivationThread.join(1000); + + } + + if (System.currentTimeMillis() - start >= timeout) + { + threadDump("Timed out waiting for backup activation to exit"); + } + } + + public void resetNodeManager() throws Exception + { + nodeManager.stop(); + nodeManager = + createNodeManager(configuration.getJournalDirectory(), true); + } + + public Activation getActivation() + { + return activation; + } + + @Override + public HAPolicy getHAPolicy() + { + return haPolicy; + } + + @Override + public void setHAPolicy(HAPolicy haPolicy) + { + this.haPolicy = haPolicy; + } + + public ExecutorService getThreadPool() + { + return threadPool; + } + + public void setActivation(SharedNothingLiveActivation activation) + { + this.activation = activation; + } /** * Stops the server in a different thread. */ @@ -533,7 +550,7 @@ public class HornetQServerImpl implements HornetQServer { try { - stop(configuration.getHAPolicy().isFailoverOnServerShutdown(), criticalIOError, false); + stop(false, criticalIOError, false); } catch (Exception e) { @@ -545,7 +562,7 @@ public class HornetQServerImpl implements HornetQServer public final void stop() throws Exception { - stop(configuration.getHAPolicy().isFailoverOnServerShutdown()); + stop(false); } public void addActivationParam(String key, Object val) @@ -593,22 +610,10 @@ public class HornetQServerImpl implements HornetQServer /** * Stops the server - * - * @param failoverOnServerShutdown whether we will allow a backup server to become live when the - * server is stopped normally * @param criticalIOError whether we have encountered an IO error with the journal etc - * @param failingBack if true don't set the flag to stop the failback checker */ - private void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean failingBack) throws Exception + void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting) throws Exception { - if (!failingBack) - { - synchronized (failbackCheckerGuard) - { - cancelFailBackChecker = true; - } - } - synchronized (this) { if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) @@ -617,22 +622,8 @@ public class HornetQServerImpl implements HornetQServer } state = SERVER_STATE.STOPPING; - final ReplicationManager localReplicationManager = getReplicationManager(); + activation.sendLiveIsStopping(); - if (localReplicationManager != null) - { - replicationManager.sendLiveIsStopping(LiveStopping.STOP_CALLED); - // Schedule for 10 seconds - // this pool gets a 'hard' shutdown, no need to manage the Future of this Runnable. - scheduledPool.schedule(new Runnable() - { - @Override - public void run() - { - localReplicationManager.clearReplicationTokens(); - } - }, 30, TimeUnit.SECONDS); - } stopComponent(connectorsService); // we stop the groupingHandler before we stop the cluster manager so binding mappings @@ -644,27 +635,10 @@ public class HornetQServerImpl implements HornetQServer } stopComponent(clusterManager); - // connect to the scale-down target first so that when we freeze/disconnect the clients we can tell them where - // we're sending the messages - if (configuration.getHAPolicy().isScaleDown()) - { - connectToScaleDownTarget(); - } freezeConnections(); } - if (configuration.getHAPolicy().isScaleDown() && scaleDownClientSessionFactory != null) - { - try - { - scaleDown(); - } - finally - { - scaleDownClientSessionFactory.close(); - scaleDownServerLocator.close(); - } - } + activation.postConnectionFreeze(); closeAllServerSessions(criticalIOError); @@ -693,12 +667,8 @@ public class HornetQServerImpl implements HornetQServer stopComponent(deploymentManager); } - if (managementService != null) - managementService.unregisterServer(); - stopComponent(backupManager); - stopComponent(managementService); - stopComponent(replicationEndpoint); // applies to a "backup" server + activation.preStorageClose(); stopComponent(pagingManager); if (storageManager != null) @@ -709,12 +679,17 @@ public class HornetQServerImpl implements HornetQServer if (remotingService != null) remotingService.stop(criticalIOError); + // Stop the management service after the remoting service to ensure all acceptors are deregistered with JMX + if (managementService != null) + managementService.unregisterServer(); + stopComponent(managementService); + stopComponent(securityManager); stopComponent(resourceManager); stopComponent(postOffice); - if (scheduledPool != null) + if (scheduledPool != null && !scheduledPoolSupplied) { // we just interrupt all running tasks, these are supposed to be pings and the like. scheduledPool.shutdownNow(); @@ -722,7 +697,7 @@ public class HornetQServerImpl implements HornetQServer stopComponent(memoryManager); - if (threadPool != null) + if (threadPool != null && !threadPoolSupplied) { threadPool.shutdown(); try @@ -742,26 +717,21 @@ public class HornetQServerImpl implements HornetQServer } } - scheduledPool = null; - threadPool = null; + if (!threadPoolSupplied) threadPool = null; + if (!scheduledPoolSupplied) scheduledPool = null; if (securityStore != null) securityStore.stop(); - threadPool = null; - - scheduledPool = null; - pagingManager = null; securityStore = null; resourceManager = null; - replicationManager = null; - replicationEndpoint = null; postOffice = null; queueFactory = null; resourceManager = null; messagingServerControl = null; memoryManager = null; + backupManager = null; sessions.clear(); @@ -773,7 +743,7 @@ public class HornetQServerImpl implements HornetQServer SimpleString tempNodeID = getNodeID(); if (activation != null) { - activation.close(failoverOnServerShutdown); + activation.close(failoverOnServerShutdown, restarting); } if (backupActivationThread != null) { @@ -807,72 +777,7 @@ public class HornetQServerImpl implements HornetQServer } } - public long scaleDown() throws Exception - { - ScaleDownHandler scaleDownHandler = new ScaleDownHandler(pagingManager, postOffice, nodeManager, clusterManager.getClusterController()); - ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = ((PostOfficeImpl) postOffice).getDuplicateIDCaches(); - Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>(); - for (SimpleString address : duplicateIDCaches.keySet()) - { - DuplicateIDCache duplicateIDCache = postOffice.getDuplicateIDCache(address); - duplicateIDMap.put(address, duplicateIDCache.getMap()); - } - return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, resourceManager, duplicateIDMap, configuration.getManagementAddress(), null); - } - public void connectToScaleDownTarget() - { - try - { - scaleDownServerLocator = clusterManager.getHAManager().getScaleDownConnector(); - //use a Node Locator to connect to the cluster - scaleDownServerLocator.setPacketDecoder(ServerPacketDecoder.INSTANCE); - LiveNodeLocator nodeLocator = clusterManager.getHAManager().getHAPolicy().getScaleDownGroupName() == null ? - new AnyLiveNodeLocatorForScaleDown(HornetQServerImpl.this) : - new NamedLiveNodeLocatorForScaleDown(clusterManager.getHAManager().getHAPolicy().getScaleDownGroupName(), HornetQServerImpl.this); - scaleDownServerLocator.addClusterTopologyListener(nodeLocator); - - nodeLocator.connectToCluster(scaleDownServerLocator); - // a timeout is necessary here in case we use a NamedLiveNodeLocatorForScaleDown and there's no matching node in the cluster - // should the timeout be configurable? - nodeLocator.locateNode(HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT); - ClientSessionFactoryInternal clientSessionFactory = null; - while (clientSessionFactory == null) - { - Pair<TransportConfiguration, TransportConfiguration> possibleLive = null; - try - { - possibleLive = nodeLocator.getLiveConfiguration(); - if (possibleLive == null) // we've tried every connector - break; - clientSessionFactory = (ClientSessionFactoryInternal) scaleDownServerLocator.createSessionFactory(possibleLive.getA(), 0, false); - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.trace("Failed to connect to " + possibleLive.getA()); - nodeLocator.notifyRegistrationFailed(false); - if (clientSessionFactory != null) - { - clientSessionFactory.close(); - } - clientSessionFactory = null; - // should I try the backup (i.e. getB()) from possibleLive? - } - } - if (clientSessionFactory != null) - { - scaleDownClientSessionFactory = clientSessionFactory; - } - else - { - throw new HornetQException("Unable to connect to server for scale-down"); - } - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.failedToScaleDown(e); - } - } public boolean checkLiveIsNotColocated(String nodeId) { @@ -894,17 +799,7 @@ public class HornetQServerImpl implements HornetQServer */ private void freezeConnections() { - ReplicationManager localReplicationManager = getReplicationManager(); - TransportConfiguration tc = scaleDownClientSessionFactory == null ? null : scaleDownClientSessionFactory.getConnectorConfiguration(); - String nodeID = tc == null ? null : scaleDownClientSessionFactory.getServerLocator().getTopology().getMember(tc).getNodeId(); - if (remotingService != null && localReplicationManager != null) - { - remotingService.freeze(nodeID, localReplicationManager.getBackupTransportConnection()); - } - else if (remotingService != null) - { - remotingService.freeze(nodeID, null); - } + activation.freezeConnections(remotingService); // after disconnecting all the clients close all the server sessions so any messages in delivery will be cancelled back to the queue for (ServerSession serverSession : sessions.values()) @@ -962,7 +857,7 @@ public class HornetQServerImpl implements HornetQServer } - private static void stopComponent(HornetQComponent component) throws Exception + static void stopComponent(HornetQComponent component) throws Exception { if (component != null) component.stop(); @@ -1134,7 +1029,8 @@ public class HornetQServerImpl implements HornetQServer final boolean preAcknowledge, final boolean xa, final String defaultAddress, - final SessionCallback callback) throws Exception + final SessionCallback callback, + final ServerSessionFactory sessionFactory) throws Exception { if (securityStore != null) @@ -1142,16 +1038,42 @@ public class HornetQServerImpl implements HornetQServer securityStore.authenticate(username, password); } final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor()); - final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context); + final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, sessionFactory); sessions.put(name, session); return session; } - protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context) throws Exception + protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory) throws Exception { - return new ServerSessionImpl(name, + if (sessionFactory == null) + { + return new ServerSessionImpl(name, + username, + password, + minLargeMessageSize, + autoCommitSends, + autoCommitAcks, + preAcknowledge, + configuration.isPersistDeliveryCountBeforeDelivery(), + xa, + connection, + storageManager, + postOffice, + resourceManager, + securityStore, + managementService, + this, + configuration.getManagementAddress(), + defaultAddress == null ? null + : new SimpleString(defaultAddress), + callback, + context); + } + else + { + return sessionFactory.createCoreSession(name, username, password, minLargeMessageSize, @@ -1172,9 +1094,11 @@ public class HornetQServerImpl implements HornetQServer : new SimpleString(defaultAddress), callback, context); + } } - protected SecurityStore getSecurityStore() + @Override + public SecurityStore getSecurityStore() { return securityStore; } @@ -1233,18 +1157,6 @@ public class HornetQServerImpl implements HornetQServer return activationLatch.await(timeout, unit); } - @Override - public boolean waitForBackupSync(long timeout, TimeUnit unit) throws InterruptedException - { - if (configuration.getHAPolicy().getPolicyType() == HAPolicy.POLICY_TYPE.BACKUP_REPLICATED) - { - return backupSyncLatch.await(timeout, unit); - } - else - { - return true; - } - } public HornetQServerControlImpl getHornetQServerControl() { @@ -1440,17 +1352,9 @@ public class HornetQServerImpl implements HornetQServer return groupingHandler; } - public ReplicationEndpoint getReplicationEndpoint() - { - return replicationEndpoint; - } - public ReplicationManager getReplicationManager() { - synchronized (replicationLock) - { - return replicationManager; - } + return activation.getReplicationManager(); } public ConnectorsService getConnectorsService() @@ -1505,7 +1409,7 @@ public class HornetQServerImpl implements HornetQServer postOffice, storageManager); - Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert); + Binding binding = new DivertBinding(storageManager.generateID(), sAddress, divert); postOffice.addBinding(binding); @@ -1575,7 +1479,7 @@ public class HornetQServerImpl implements HornetQServer { return new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingDirectory(), - configuration.getJournalBufferSize_NIO(), + configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), @@ -1628,6 +1532,60 @@ public class HornetQServerImpl implements HornetQServer } } + private void callActivationCompleteCallbacks() + { + for (ActivateCallback callback : activateCallbacks) + { + callback.activationComplete(); + } + } + + /** + * Sets up HornetQ Executor Services. + */ + private void initializeExecutorServices() + { + /* We check to see if a Thread Pool is supplied in the InjectedObjectRegistry. If so we created a new Ordered + * Executor based on the provided Thread pool. Otherwise we create a new ThreadPool. + */ + if (serviceRegistry.getExecutorService() == null) + { + ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-server-" + this.toString(), false, getThisClassLoader()); + if (configuration.getThreadPoolMaxSize() == -1) + { + threadPool = Executors.newCachedThreadPool(tFactory); + } + else + { + threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), tFactory); + } + } + else + { + threadPool = serviceRegistry.getExecutorService(); + this.threadPoolSupplied = true; + } + this.executorFactory = new OrderedExecutorFactory(threadPool); + + /* We check to see if a Scheduled Executor Service is provided in the InjectedObjectRegistry. If so we use this + * Scheduled ExecutorService otherwise we create a new one. + */ + if (serviceRegistry.getScheduledExecutorService() == null) + { + ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-scheduled-threads", false, getThisClassLoader()); + scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory); + } + else + { + this.scheduledPoolSupplied = true; + this.scheduledPool = serviceRegistry.getScheduledExecutorService(); + } + } + + public ServiceRegistry getServiceRegistry() + { + return serviceRegistry; + } /** * Starts everything apart from RemotingService and loading the data. @@ -1636,16 +1594,13 @@ public class HornetQServerImpl implements HornetQServer * {@link #initialisePart2(boolean)}. * @param scalingDown */ - private synchronized boolean initialisePart1(boolean scalingDown) throws Exception + synchronized boolean initialisePart1(boolean scalingDown) throws Exception { if (state == SERVER_STATE.STOPPED) return false; - // Create the pools - we have two pools - one for non scheduled - and another for scheduled - - ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-server-" + this.toString(), - false, - getThisClassLoader()); + // Create the pools - we have two pools - one for non scheduled - and another for scheduled + initializeExecutorServices(); if (configuration.getJournalType() == JournalType.ASYNCIO && !AIOSequentialFileFactory.isSupported()) { @@ -1653,21 +1608,6 @@ public class HornetQServerImpl implements HornetQServer configuration.setJournalType(JournalType.NIO); } - if (configuration.getThreadPoolMaxSize() == -1) - { - threadPool = Executors.newCachedThreadPool(tFactory); - } - else - { - threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), tFactory); - } - - executorFactory = new OrderedExecutorFactory(threadPool); - scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), - new HornetQThreadFactory("HornetQ-scheduled-threads", - false, - getThisClassLoader())); - managementService = new ManagementServiceImpl(mbeanServer, configuration); if (configuration.getMemoryMeasureInterval() != -1) @@ -1726,13 +1666,20 @@ public class HornetQServerImpl implements HornetQServer // This can't be created until node id is set clusterManager = new ClusterManager(executorFactory, this, postOffice, scheduledPool, managementService, configuration, - nodeManager, configuration.getHAPolicy().isBackup()); + nodeManager, haPolicy.isBackup()); backupManager = new BackupManager(this, executorFactory, scheduledPool, nodeManager, configuration, clusterManager); clusterManager.deploy(); - remotingService = new RemotingServiceImpl(clusterManager, configuration, this, managementService, scheduledPool, protocolManagerFactories); + remotingService = new RemotingServiceImpl(clusterManager, + configuration, + this, + managementService, + scheduledPool, + protocolManagerFactories, + executorFactory.getExecutor(), + serviceRegistry); messagingServerControl = managementService.registerServer(postOffice, storageManager, @@ -1745,7 +1692,7 @@ public class HornetQServerImpl implements HornetQServer queueFactory, scheduledPool, pagingManager, - configuration.getHAPolicy().isBackup()); + haPolicy.isBackup()); // Address settings need to deployed initially, since they're require on paging manager.start() @@ -1802,7 +1749,7 @@ public class HornetQServerImpl implements HornetQServer /* * Load the data, and start remoting service so clients can connect */ - private synchronized void initialisePart2(boolean scalingDown) throws Exception + synchronized void initialisePart2(boolean scalingDown) throws Exception { // Load the journal and populate queues, transactions and caches in memory @@ -1813,7 +1760,7 @@ public class HornetQServerImpl implements HornetQServer pagingManager.reloadStores(); - JournalLoadInformation[] journalInfo = loadJournals(scalingDown); + JournalLoadInformation[] journalInfo = loadJournals(); final ServerInfo dumper = new ServerInfo(this, pagingManager); @@ -1891,6 +1838,8 @@ public class HornetQServerImpl implements HornetQServer { activationLatch.countDown(); } + + callActivationCompleteCallbacks(); } private void deploySecurityFromConfiguration() @@ -1921,35 +1870,18 @@ public class HornetQServerImpl implements HornetQServer } } - private JournalLoadInformation[] loadJournals(boolean scalingDown) throws Exception + private JournalLoadInformation[] loadJournals() throws Exception { - JournalLoader journalLoader; + JournalLoader journalLoader = activation.createJournalLoader(postOffice, + pagingManager, + storageManager, + queueFactory, + nodeManager, + managementService, + groupingHandler, + configuration, + parentServer); - if (scalingDown) - { - journalLoader = new BackupRecoveryJournalLoader(postOffice, - pagingManager, - storageManager, - queueFactory, - nodeManager, - managementService, - groupingHandler, - configuration, - parentServer, - clusterManager.getHAManager().getScaleDownConnector(), - clusterManager.getClusterController()); - } - else - { - journalLoader = new PostOfficeJournalLoader(postOffice, - pagingManager, - storageManager, - queueFactory, - nodeManager, - managementService, - groupingHandler, - configuration); - } JournalLoadInformation[] journalInfo = new JournalLoadInformation[2]; List<QueueBindingInfo> queueBindingInfos = new ArrayList(); @@ -2063,8 +1995,8 @@ public class HornetQServerImpl implements HornetQServer Filter filter = FilterImpl.createFilter(filterString); - long txID = storageManager.generateUniqueID(); - long queueID = storageManager.generateUniqueID(); + long txID = storageManager.generateID(); + long queueID = storageManager.generateID(); PageSubscription pageSubscription; @@ -2212,7 +2144,7 @@ public class HornetQServerImpl implements HornetQServer /** * Check if journal directory exists or create it (if configured to do so) */ - private void checkJournalDirectory() + void checkJournalDirectory() { File journalDir = new File(configuration.getJournalDirectory()); @@ -2229,927 +2161,46 @@ public class HornetQServerImpl implements HornetQServer } } - /** - * To be called by backup trying to fail back the server - */ - private void startFailbackChecker() - { - scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000L, 1000L, TimeUnit.MILLISECONDS); - } // Inner classes // -------------------------------------------------------------------------------- - private class FailbackChecker implements Runnable + + + public final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener { - private boolean restarting = false; + boolean failedAlready = false; - public void run() + public synchronized void onIOException(Exception cause, String message, SequentialFile file) { - try - { - if (!restarting && nodeManager.isAwaitingFailback()) - { - HornetQServerLogger.LOGGER.awaitFailBack(); - restarting = true; - Thread t = new Thread(new Runnable() - { - public void run() - { - try - { - HornetQServerLogger.LOGGER.debug(HornetQServerImpl.this + "::Stopping live node in favor of failback"); - stop(true, false, true); - // We need to wait some time before we start the backup again - // otherwise we may eventually start before the live had a chance to get it - Thread.sleep(configuration.getHAPolicy().getFailbackDelay()); - synchronized (failbackCheckerGuard) - { - if (cancelFailBackChecker) - return; - if (configuration.getHAPolicy().isSharedStore()) - { - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_SHARED_STORE); - } - else - { - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); - } - HornetQServerLogger.LOGGER.debug(HornetQServerImpl.this + - "::Starting backup node now after failback"); - start(); - } - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.serverRestartWarning(); - } - } - }); - t.start(); - } - } - catch (Exception e) + if (!failedAlready) { - HornetQServerLogger.LOGGER.serverRestartWarning(e); + failedAlready = true; + + HornetQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause); + + stopTheServer(true); } } } - private final class SharedStoreLiveActivation implements Activation + + /** + * This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a + * utility class, as it would be a door to load anything you like in a safe VM. + * For that reason any class trying to do a privileged block should do with the AccessController directly. + */ + private static Object safeInitNewInstance(final String className) { - public void run() + return AccessController.doPrivileged(new PrivilegedAction<Object>() { - try - { - HornetQServerLogger.LOGGER.awaitingLiveLock(); - - checkJournalDirectory(); - - if (HornetQServerLogger.LOGGER.isDebugEnabled()) - { - HornetQServerLogger.LOGGER.debug("First part initialization on " + this); - } - - if (!initialisePart1(false)) - return; - - if (nodeManager.isBackupLive()) - { - /* - * looks like we've failed over at some point need to inform that we are the backup - * so when the current live goes down they failover to us - */ - if (HornetQServerLogger.LOGGER.isDebugEnabled()) - { - HornetQServerLogger.LOGGER.debug("announcing backup to the former live" + this); - } - backupManager.start(); - backupManager.announceBackup(); - Thread.sleep(configuration.getHAPolicy().getFailbackDelay()); - } - - nodeManager.startLiveNode(); - - if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) - { - return; - } - - initialisePart2(false); - - HornetQServerLogger.LOGGER.serverIsLive(); - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.initializationError(e); - } - } - - public void close(boolean permanently) throws Exception - { - // TO avoid a NPE from stop - NodeManager nodeManagerInUse = nodeManager; - - if (nodeManagerInUse != null) - { - if (permanently) - { - nodeManagerInUse.crashLiveServer(); - } - else - { - nodeManagerInUse.pauseLiveServer(); - } - } - } - } - - private final class SharedStoreBackupActivation implements Activation - { - public void run() - { - try - { - nodeManager.startBackup(); - - boolean scalingDown = configuration.getHAPolicy().getBackupStrategy() == BackupStrategy.SCALE_DOWN; - - if (!initialisePart1(scalingDown)) - return; - - backupManager.start(); - - state = SERVER_STATE.STARTED; - - HornetQServerLogger.LOGGER.backupServerStarted(version.getFullVersion(), nodeManager.getNodeId()); - - nodeManager.awaitLiveNode(); - - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.SHARED_STORE); - - backupManager.activated(); - if (state != SERVER_STATE.STARTED) - { - return; - } - - initialisePart2(scalingDown); - - if (scalingDown) - { - HornetQServerLogger.LOGGER.backupServerScaledDown(); - Thread t = new Thread(new Runnable() - { - @Override - public void run() - { - try - { - stop(); - //we are shared store but if we were started by a parent server then we shouldn't restart - if (configuration.getHAPolicy().isRestartBackup()) - { - start(); - } - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.serverRestartWarning(); - } - } - }); - t.start(); - return; - } - else - { - HornetQServerLogger.LOGGER.backupServerIsLive(); - - nodeManager.releaseBackup(); - } - if (configuration.getHAPolicy().isAllowAutoFailBack()) - { - startFailbackChecker(); - } - } - catch (InterruptedException e) - { - // this is ok, we are being stopped - } - catch (ClosedChannelException e) - { - // this is ok too, we are being stopped - } - catch (Exception e) - { - if (!(e.getCause() instanceof InterruptedException)) - { - HornetQServerLogger.LOGGER.initializationError(e); - } - } - catch (Throwable e) - { - HornetQServerLogger.LOGGER.initializationError(e); - } - } - - public void close(boolean permanently) throws Exception - { - - // To avoid a NPE cause by the stop - NodeManager nodeManagerInUse = nodeManager; - - if (configuration.getHAPolicy().isBackup()) - { - long timeout = 30000; - - long start = System.currentTimeMillis(); - - while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout) - { - if (nodeManagerInUse != null) - { - nodeManagerInUse.interrupt(); - } - - backupActivationThread.interrupt(); - - backupActivationThread.join(1000); - - } - - if (System.currentTimeMillis() - start >= timeout) - { - threadDump("Timed out waiting for backup activation to exit"); - } - - if (nodeManagerInUse != null) - { - nodeManagerInUse.stopBackup(); - } - } - else - { - - if (nodeManagerInUse != null) - { - // if we are now live, behave as live - // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is - // started before the live - if (permanently) - { - nodeManagerInUse.crashLiveServer(); - } - else - { - nodeManagerInUse.pauseLiveServer(); - } - } - } - } - } - - private final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener - { - boolean failedAlready = false; - - public synchronized void onIOException(Exception cause, String message, SequentialFile file) - { - if (!failedAlready) - { - failedAlready = true; - - HornetQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause); - - stopTheServer(true); - } - } - } - - private interface Activation extends Runnable - { - void close(boolean permanently) throws Exception; - } - - private final class SharedNothingBackupActivation implements Activation - { - SharedNothingBackupQuorum backupQuorum; - private final boolean attemptFailBack; - private String nodeID; - ClusterControl clusterControl; - private boolean closed; - - public SharedNothingBackupActivation(boolean attemptFailBack) - { - this.attemptFailBack = attemptFailBack; - } - - public void run() - { - try - { - synchronized (HornetQServerImpl.this) - { - state = SERVER_STATE.STARTED; - } - // move all data away: - nodeManager.stop(); - moveServerData(); - nodeManager.start(); - synchronized (this) - { - if (closed) - return; - } - - boolean scalingDown = configuration.getHAPolicy().getBackupStrategy() == BackupStrategy.SCALE_DOWN; - - if (!initialisePart1(scalingDown)) - return; - - synchronized (this) - { - if (closed) - return; - backupQuorum = new SharedNothingBackupQuorum(storageManager, nodeManager, scheduledPool); - clusterManager.getQuorumManager().registerQuorum(backupQuorum); - } - - //use a Node Locator to connect to the cluster - LiveNodeLocator nodeLocator; - if (activationParams.get(ActivationParams.REPLICATION_ENDPOINT) != null) - { - TopologyMember member = (TopologyMember) activationParams.get(ActivationParams.REPLICATION_ENDPOINT); - nodeLocator = new NamedNodeIdNodeLocator(member.getNodeId(), new Pair<>(member.getLive(), member.getBackup())); - } - else - { - nodeLocator = configuration.getHAPolicy().getBackupGroupName() == null ? - new AnyLiveNodeLocatorForReplication(backupQuorum, HornetQServerImpl.this) : - new NamedLiveNodeLocatorForReplication(configuration.getHAPolicy().getBackupGroupName(), backupQuorum); - } - ClusterController clusterController = clusterManager.getClusterController(); - clusterController.addClusterTopologyListenerForReplication(nodeLocator); - //todo do we actually need to wait? - clusterController.awaitConnectionToReplicationCluster(); - - clusterController.addIncomingInterceptorForReplication(new ReplicationError(HornetQServerImpl.this, nodeLocator)); - - // nodeManager.startBackup(); - - backupManager.start(); - - replicationEndpoint.setBackupQuorum(backupQuorum); - replicationEndpoint.setExecutor(executorFactory.getExecutor()); - EndpointConnector endpointConnector = new EndpointConnector(); - - HornetQServerLogger.LOGGER.backupServerStarted(version.getFullVersion(), nodeManager.getNodeId()); - state = SERVER_STATE.STARTED; - - BACKUP_ACTIVATION signal; - do - { - //locate the first live server to try to replicate - nodeLocator.locateNode(); - if (closed) - { - return; - } - Pair<TransportConfiguration, TransportConfiguration> possibleLive = nodeLocator.getLiveConfiguration(); - nodeID = nodeLocator.getNodeID(); - //in a normal (non failback) scenario if we couldn't find our live server we should fail - if (!attemptFailBack) - { - //this shouldn't happen - if (nodeID == null) - throw new RuntimeException("Could not establish the connection"); - nodeManager.setNodeID(nodeID); - } - - try - { - clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getA()); - } - catch (Exception e) - { - if (possibleLive.getB() != null) - { - try - { - clusterControl = clusterController.connectToNodeInReplicatedCluster(possibleLive.getB()); - } - catch (Exception e1) - { - clusterControl = null; - } - } - } - if (clusterControl == null) - { - //its ok to retry here since we haven't started replication yet - //it may just be the server has gone since discovery - Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster()); - signal = BACKUP_ACTIVATION.ALREADY_REPLICATING; - continue; - } - - threadPool.execute(endpointConnector); - /** - * Wait for a signal from the the quorum manager, at this point if replication has been successful we can - * fail over or if there is an error trying to replicate (such as already replicating) we try the - * process again on the next live server. All the action happens inside {@link BackupQuorum} - */ - signal = backupQuorum.waitForStatusChange(); - /** - * replicationEndpoint will be holding lots of open files. Make sure they get - * closed/sync'ed. - */ - stopComponent(replicationEndpoint); - // time to give up - if (!isStarted() || signal == STOP) - return; - // time to fail over - else if (signal == FAIL_OVER) - break; - // something has gone badly run restart from scratch - else if (signal == BACKUP_ACTIVATION.FAILURE_REPLICATING) - { - Thread startThread = new Thread(new Runnable() - { - @Override - public void run() - { - try - { - stop(); - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.errorRestartingBackupServer(e, HornetQServerImpl.this); - } - } - }); - startThread.start(); - return; - } - //ok, this live is no good, let's reset and try again - //close this session factory, we're done with it - clusterControl.close(); - backupQuorum.reset(); - if (replicationEndpoint.getChannel() != null) - { - replicationEndpoint.getChannel().close(); - replicationEndpoint.setChannel(null); - } - } - while (signal == BACKUP_ACTIVATION.ALREADY_REPLICATING); - - clusterManager.getQuorumManager().unRegisterQuorum(backupQuorum); - - if (!isRemoteBackupUpToDate()) - { - throw HornetQMessageBundle.BUNDLE.backupServerNotInSync(); - } - - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.REPLICATED); - synchronized (HornetQServerImpl.this) - { - if (!isStarted()) - return; - HornetQServerLogger.LOGGER.becomingLive(HornetQServerImpl.this); - nodeManager.stopBackup(); - storageManager.start(); - backupManager.activated(); - initialisePart2(scalingDown); - } - } - catch (Exception e) - { - if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !isStarted()) - // do not log these errors if the server is being stopped. - return; - HornetQServerLogger.LOGGER.initializationError(e); - e.printStackTrace(); - } - } - - public void close(final boolean permanently) throws Exception - { - synchronized (this) - { - if (backupQuorum != null) - backupQuorum.causeExit(STOP); - closed = true; - } - - if (configuration.getHAPolicy().isBackup()) - { - long timeout = 30000; - - long start = System.currentTimeMillis(); - - // To avoid a NPE cause by the stop - NodeManager nodeManagerInUse = nodeManager; - - while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout) - { - - if (nodeManagerInUse != null) - { - nodeManagerInUse.interrupt(); - } - - backupActivationThread.interrupt(); - - Thread.sleep(1000); - } - - if (System.currentTimeMillis() - start >= timeout) - { - HornetQServerLogger.LOGGER.backupActivationProblem(); - } - - if (nodeManagerInUse != null) - { - nodeManagerInUse.stopBackup(); - } - } - } - - /** - * Live has notified this server that it is going to stop. - */ - public void failOver(final LiveStopping finalMessage) - { - if (finalMessage == null) - { - backupQuorum.causeExit(FAILURE_REPLICATING); - } - else - { - backupQuorum.failOver(finalMessage); - } - } - - private class EndpointConnector implements Runnable - { - @Override - public void run() - { - try - { - //we should only try once, if its not there we should move on. - clusterControl.getSessionFactory().setReconnectAttempts(1); - backupQuorum.setSessionFactory(clusterControl.getSessionFactory()); - //get the connection and request replication to live - clusterControl.authorize(); - connectToReplicationEndpoint(clusterControl); - replicationEndpoint.start(); - clusterControl.announceReplicatingBackupToLive(attemptFailBack); - } - catch (Exception e) - { - //we shouldn't stop the server just mark the connector as tried and unavailable - HornetQServerLogger.LOGGER.replicationStartProblem(e); - backupQuorum.causeExit(FAILURE_REPLICATING); - } - } - - private synchronized ReplicationEndpoint connectToReplicationEndpoint(final ClusterControl control) throws Exception - { - if (!isStarted()) - return null; - if (!configuration.getHAPolicy().isBackup()) - { - throw HornetQMessageBundle.BUNDLE.serverNotBackupServer(); - } - - Channel replicationChannel = control.createReplicationChannel(); - - replicationChannel.setHandler(replicationEndpoint); - - if (replicationEndpoint.getChannel() != null) - { - throw HornetQMessageBundle.BUNDLE.alreadyHaveReplicationServer(); - } - - replicationEndpoint.setChannel(replicationChannel); - - return replicationEndpoint; - } - } - } - - - private final class SharedNothingLiveActivation implements Activation - { - public void run() - { - try - { - if (configuration.isClustered() && configuration.isCheckForLiveServer() && isNodeIdUsed()) - { - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); - return; - } - - initialisePart1(false); - - initialisePart2(false); - - if (identity != null) - { - HornetQServerLogger.LOGGER.serverIsLive(identity); - } - else - { - HornetQServerLogger.LOGGER.serverIsLive(); - } - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.initializationError(e); - } - } - - /** - * Determines whether there is another server already running with this server's nodeID. - * <p/> - * This can happen in case of a successful fail-over followed by the live's restart - * (attempting a fail-back). - * - * @throws Exception - */ - private boolean isNodeIdUsed() throws Exception - { - if (configuration.getClusterConfigurations().isEmpty()) - return false; - SimpleString nodeId0; - try - { - nodeId0 = nodeManager.readNodeId(); - } - catch (HornetQIllegalStateException e) - { - nodeId0 = null; - } - - ServerLocatorInternal locator; - - ClusterConnectionConfiguration config = ConfigurationUtils.getReplicationClusterConfiguration(configuration); - - locator = getLocator(config); - - ClientSessionFactoryInternal factory = null; - - NodeIdListener listener = new NodeIdListener(nodeId0); - - locator.addClusterTopologyListener(listener); - try - { - locator.setReconnectAttempts(0); - try - { - locator.addClusterTopologyListener(listener); - factory = locator.connectNoWarnings(); - } - catch (Exception notConnected) - { - return false; - } - - listener.latch.await(5, TimeUnit.SECONDS); - - return listener.isNodePresent; - } - finally - { - if (factory != null) - factory.close(); - if (locator != null) - locator.close(); - } - } - - public void close(boolean permanently) throws Exception - { - // To avoid a NPE cause by the stop - NodeManager nodeManagerInUse = nodeManager; - - if (nodeManagerInUse != null) - { - if (permanently) - { - nodeManagerInUse.crashLiveServer(); - } - else - { - nodeManagerInUse.pauseLiveServer(); - } - } - } - } - - static final class NodeIdListener implements ClusterTopologyListener - { - volatile boolean isNodePresent = false; - - private final SimpleString nodeId; - private final CountDownLatch latch = new CountDownLatch(1); - - public NodeIdListener(SimpleString nodeId) - { - this.nodeId = nodeId; - } - - @Override - public void nodeUP(TopologyMember topologyMember, boolean last) - { - boolean isOurNodeId = nodeId != null && nodeId.toString().equals(topologyMember.getNodeId()); - if (isOurNodeId) - { - isNodePresent = true; - } - if (isOurNodeId || last) - { - latch.countDown(); - } - } - - @Override - public void nodeDown(long eventUID, String nodeID) - { - // no-op - } - } - - private TransportConfiguration[] connectorNameListToArray(final List<String> connectorNames) - { - TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, - connectorNames.size()); - int count = 0; - for (String connectorName : connectorNames) - { - TransportConfiguration connector = configuration.getConnectorConfigurations().get(connectorName); - - if (connector == null) - { - HornetQServerLogger.LOGGER.bridgeNoConnector(connectorName); - - return null; - } - - tcConfigs[count++] = connector; - } - - return tcConfigs; - } - - /** - * This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a - * utility class, as it would be a door to load anything you like in a safe VM. - * For that reason any class trying to do a privileged block should do with the AccessController directly. - */ - private static Object safeInitNewInstance(final String className) - { - return AccessController.doPrivileged(new PrivilegedAction<Object>() - { - public Object run() + public Object run() { return ClassloadingUtil.newInstanceFromClassLoader(className); } }); } - @Override - public void startReplication(CoreRemotingConnection rc, final ClusterConnection clusterConnection, - final Pair<TransportConfiguration, TransportConfiguration> pair, final boolean isFailBackRequest) throws HornetQException - { - if (replicationManager != null) - { - throw new HornetQAlreadyReplicatingException(); - } - - if (!isStarted()) - { - throw new HornetQIllegalStateException(); - } - - synchronized (replicationLock) - { - - if (replicationManager != null) - { - throw new HornetQAlreadyReplicatingException(); - } - ReplicationFailureListener listener = new ReplicationFailureListener(); - rc.addCloseListener(listener); - rc.addFailureListener(listener); - replicationManager = new ReplicationManager(rc, executorFactory); - replicationManager.start(); - Thread t = new Thread(new Runnable() - { - public void run() - { - try - { - storageManager.startReplication(replicationManager, pagingManager, getNodeID().toString(), - isFailBackRequest && configuration.getHAPolicy().isAllowAutoFailBack()); - clusterConnection.nodeAnnounced(System.currentTimeMillis(), getNodeID().toString(), configuration.getHAPolicy().getBackupGroupName(), clusterManager.getHAManager().getHAPolicy().getScaleDownGroupName(), pair, true); - - backupUpToDate = false; - - if (isFailBackRequest && configuration.getHAPolicy().isAllowAutoFailBack()) - { - BackupTopologyListener listener1 = new BackupTopologyListener(getNodeID().toString()); - clusterConnection.addClusterTopologyListener(listener1); - if (listener1.waitForBackup()) - { - try - { - Thread.sleep(configuration.getHAPolicy().getFailbackDelay()); - } - catch (InterruptedException e) - { - // - } - //if we have to many backups kept or arent configured to restart just stop, otherwise restart as a backup - if (!configuration.getHAPolicy().isRestartBackup() && countNumberOfCopiedJournals() >= configuration.getMaxSavedReplicatedJournalsSize() && configuration.getMaxSavedReplicatedJournalsSize() >= 0) - { - stop(true); - HornetQServerLogger.LOGGER.stopReplicatedBackupAfterFailback(); - } - else - { - stop(true); - HornetQServerLogger.LOGGER.restartingReplicatedBackupAfterFailback(); - configuration.getHAPolicy().setPolicyType(HAPolicy.POLICY_TYPE.BACKUP_REPLICATED); - start(); - } - } - else - { - HornetQServerLogger.LOGGER.failbackMissedBackupAnnouncement(); - } - } - } - catch (Exception e) - { - if (state == HornetQServerImpl.SERVER_STATE.STARTED) - { - /* - * The reasoning here is that the exception was either caused by (1) the - * (interaction with) the backup, or (2) by an IO Error at the storage. If (1), we - * can swallow the exception and ignore the replication request. If (2) the live - * will crash shortly. - */ - HornetQServerLogger.LOGGER.errorStartingReplication(e); - } - try - { - stopComponent(replicationManager); - } - catch (Exception hqe) - { - HornetQServerLogger.LOGGER.errorStoppingReplication(hqe); - } - finally - { - synchronized (replicationLock) - { - replicationManager = null; - } - } - } - } - }); - - t.start(); - } - } - - /** - * Whether a remote backup server was in sync with its live server. If it was not in sync, it may - * not take over the live's functions. - * <p/> - * A local backup server or a live server should always return {@code true} - * - * @return whether the backup is up-to-date, if the server is not a backup it always returns - * {@code true}. - */ - public boolean isRemoteBackupUpToDate() - { - return backupUpToDate; - } - - public void setRemoteBackupUpToDate() - { - backupManager.announceBackup(); - backupUpToDate = true; - backupSyncLatch.countDown(); - } - public void addProtocolManagerFactory(ProtocolManagerFactory factory) { protocolManagerFactories.add(factory); @@ -3186,7 +2237,7 @@ public class HornetQServerImpl implements HornetQServer } - private int countNumberOfCopiedJournals() + int countNumberOfCopiedJournals() { //will use the main journal to check for how many backups have been kept File journalDir = new File(configuration.getJournalDirectory()); @@ -3209,98 +2260,13 @@ public class HornetQServerImpl implements HornetQServer return numberOfbackupsSaved; } - private final class ReplicationFailureListener implements FailureListener, CloseListener - { - - @Override - public void connectionFailed(HornetQException exception, boolean failedOver) - { - connectionClosed(); - } - - @Override - public void connectionFailed(final HornetQException me, boolean failedOver, String scaleDownTargetNodeID) - { - connectionFailed(me, failedOver); - } - - @Override - public void connectionClosed() - { - threadPool.execute(new Runnable() - { - public void run() - { - synchronized (replicationLock) - { - if (replicationManager != null) - { - storageManager.stopReplication(); - replicationManager = null; - } - } - } - }); - } - } - - /** - * @throws HornetQException - */ - public void remoteFailOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) throws HornetQException - { - HornetQServerLogger.LOGGER.trace("Remote fail-over, got message=" + finalMessage + ", backupUpToDate=" + - backupUpToDate); - if (!configuration.getHAPolicy().isBackup() || configuration.getHAPolicy().isSharedStore()) - { - throw new HornetQInternalErrorException(); - } - if (activation instanceof SharedNothingBackupActivation) - { - final SharedNothingBackupActivation replicationActivation = ((SharedNothingBackupActivation) activation); - - if (!backupUpToDate) - { - replicationActivation.failOver(null); - } - else - { - replicationActivation.failOver(finalMessage); - } - } - } - - - private ServerLocatorInternal getLocator(ClusterConnectionConfiguration config) throws HornetQException - { - ServerLocatorInternal locator; - if (config.getDiscoveryGroupName() != null) - { - DiscoveryGroupConfiguration dg = configuration.getDiscoveryGroupConfigurations().get(config.getDiscoveryGroupName()); - - if (dg == null) - { - throw HornetQMessageBundle.BUNDLE.noDiscoveryGroupFound(dg); - } - locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(dg); - } - else - { - TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ? connectorNameListToArray(config.getStaticConnectors()) - : null; - - locator = (ServerLocatorInternal) HornetQClient.createServerLocatorWithHA(tcConfigs); - } - return locator; - } - /** * Move data away before starting data synchronization for fail-back. * <p/> * Use case is a server, upon restarting, finding a former backup running in its place. It will * move any older data away and log a warning about it. */ - private void moveServerData() + void moveServerData() { String[] dataDirs = new String[]{configuration.getBindingsDirectory(),
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveActivation.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveActivation.java new file mode 100644 index 0000000..c863c66 --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveActivation.java @@ -0,0 +1,17 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.impl; + +public abstract class LiveActivation extends Activation +{ +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveOnlyActivation.java ---------------------------------------------------------------------- diff --git a/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveOnlyActivation.java b/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveOnlyActivation.java new file mode 100644 index 0000000..ceb60d1 --- /dev/null +++ b/hornetq-server/src/main/java/org/hornetq/core/server/impl/LiveOnlyActivation.java @@ -0,0 +1,197 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.server.impl; + +import org.hornetq.api.core.HornetQException; +import org.hornetq.api.core.Pair; +import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.TransportConfiguration; +import org.hornetq.api.core.client.HornetQClient; +import org.hornetq.core.client.impl.ClientSessionFactoryInternal; +import org.hornetq.core.client.impl.ServerLocatorInternal; +import org.hornetq.core.postoffice.DuplicateIDCache; +import org.hornetq.core.postoffice.impl.PostOfficeImpl; +import org.hornetq.core.remoting.server.RemotingService; +import org.hornetq.core.server.HornetQServerLogger; +import org.hornetq.core.server.LiveNodeLocator; +import org.hornetq.core.server.cluster.HornetQServerSideProtocolManagerFactory; +import org.hornetq.core.server.cluster.ha.LiveOnlyPolicy; +import org.hornetq.core.server.cluster.ha.ScaleDownPolicy; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +public class LiveOnlyActivation extends Activation +{ + //this is how we act when we initially start as live + private LiveOnlyPolicy liveOnlyPolicy; + + private final HornetQServerImpl hornetQServer; + + private ServerLocatorInternal scaleDownServerLocator; + + private ClientSessionFactoryInternal scaleDownClientSessionFactory; + + public LiveOnlyActivation(HornetQServerImpl server, LiveOnlyPolicy liveOnlyPolicy) + { + this.hornetQServer = server; + this.liveOnlyPolicy = liveOnlyPolicy; + } + + public void run() + { + try + { + hornetQServer.initialisePart1(false); + + hornetQServer.initialisePart2(false); + + if (hornetQServer.getIdentity() != null) + { + HornetQServerLogger.LOGGER.serverIsLive(hornetQServer.getIdentity()); + } + else + { + HornetQServerLogger.LOGGER.serverIsLive(); + } + } + catch (Exception e) + { + HornetQServerLogger.LOGGER.initializationError(e); + } + } + + @Override + public void close(boolean permanently, boolean restarting) throws Exception + { + if (scaleDownServerLocator != null) + { + scaleDownServerLocator.close(); + scaleDownServerLocator = null; + } + } + + public void freezeConnections(RemotingService remotingService) + { + // connect to the scale-down target first so that when we freeze/disconnect the clients we can tell them where + // we're sending the messages + if (liveOnlyPolicy.getScaleDownPolicy() != null && liveOnlyPolicy.getScaleDownPolicy().isEnabled()) + { + connectToScaleDownTarget(liveOnlyPolicy.getScaleDownPolicy()); + } + + TransportConfiguration tc = scaleDownClientSessionFactory == null ? null : scaleDownClientSessionFactory.getConnectorConfiguration(); + String nodeID = tc == null ? null : scaleDownClientSessionFactory.getServerLocator().getTopology().getMember(tc).getNodeId(); + if (remotingService != null) + { + remotingService.freeze(nodeID, null); + } + } + + @Override + public void postConnectionFreeze() + { + if (liveOnlyPolicy.getScaleDownPolicy() != null + && liveOnlyPolicy.getScaleDownPolicy().isEnabled() + && scaleDownClientSessionFactory != null) + { + try + { + scaleDown(); + } + catch (Exception e) + { + HornetQServerLogger.LOGGER.failedToScaleDown(e); + } + finally + { + scaleDownClientSessionFactory.close(); + scaleDownServerLocator.close(); + } + } + } + + public void connectToScaleDownTarget(ScaleDownPolicy scaleDownPolicy) + { + try + { + scaleDownServerLocator = ScaleDownPolicy.getScaleDownConnector(scaleDownPolicy, hornetQServer); + //use a Node Locator to connect to the cluster + scaleDownServerLocator.setProtocolManagerFactory(HornetQServerSideProtocolManagerFactory.getInstance()); + LiveNodeLocator nodeLocator = scaleDownPolicy.getGroupName() == null ? + new AnyLiveNodeLocatorForScaleDown(hornetQServer) : + new NamedLiveNodeLocatorForScaleDown(scaleDownPolicy.getGroupName(), hornetQServer); + scaleDownServerLocator.addClusterTopologyListener(nodeLocator); + + nodeLocator.connectToCluster(scaleDownServerLocator); + // a timeout is necessary here in case we use a NamedLiveNodeLocatorForScaleDown and there's no matching node in the cluster + // should the timeout be configurable? + nodeLocator.locateNode(HornetQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT); + ClientSessionFactoryInternal clientSessionFactory = null; + while (clientSessionFactory == null) + { + Pair<TransportConfiguration, TransportConfiguration> possibleLive = null; + try + { + possibleLive = nodeLocator.getLiveConfiguration(); + if (possibleLive == null) // we've tried every connector + break; + clientSessionFactory = (ClientSessionFactoryInternal) scaleDownServerLocator.createSessionFactory(possibleLive.getA(), 0, false); + } + catch (Exception e) + { + HornetQServerLogger.LOGGER.trace("Failed to connect to " + possibleLive.getA()); + nodeLocator.notifyRegistrationFailed(false); + if (clientSessionFactory != null) + { + clientSessionFactory.close(); + } + clientSessionFactory = null; + // should I try the backup (i.e. getB()) from possibleLive? + } + } + if (clientSessionFactory != null) + { + scaleDownClientSessionFactory = clientSessionFactory; + } + else + { + throw new HornetQException("Unable to connect to server for scale-down"); + } + } + catch (Exception e) + { + HornetQServerLogger.LOGGER.failedToScaleDown(e); + } + } + + + public long scaleDown() throws Exception + { + ScaleDownHandler scaleDownHandler = new ScaleDownHandler(hornetQServer.getPagingManager(), + hornetQServer.getPostOffice(), + hornetQServer.getNodeManager(), + hornetQServer.getClusterManager().getClusterController()); + ConcurrentMap<SimpleString, DuplicateIDCache> duplicateIDCaches = ((PostOfficeImpl) hornetQServer.getPostOffice()).getDuplicateIDCaches(); + Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<>(); + for (SimpleString address : duplicateIDCaches.keySet()) + { + DuplicateIDCache duplicateIDCache = hornetQServer.getPostOffice().getDuplicateIDCache(address); + duplicateIDMap.put(address, duplicateIDCache.getMap()); + } + return scaleDownHandler.scaleDown(scaleDownClientSessionFactory, hornetQServer.getResourceManager(), duplicateIDMap, + hornetQServer.getManagementService().getManagementAddress(), null); + } +}
