Repository: activemq-6 Updated Branches: refs/heads/master 886180293 -> 3d749a431
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java new file mode 100644 index 0000000..2b42e72 --- /dev/null +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java @@ -0,0 +1,2345 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.core.server.impl; + +import javax.management.MBeanServer; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.management.ManagementFactory; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.api.core.Pair; +import org.apache.activemq.api.core.SimpleString; +import org.apache.activemq.core.asyncio.impl.AsynchronousFileImpl; +import org.apache.activemq.core.client.impl.ClientSessionFactoryImpl; +import org.apache.activemq.core.config.BridgeConfiguration; +import org.apache.activemq.core.config.Configuration; +import org.apache.activemq.core.config.ConfigurationUtils; +import org.apache.activemq.core.config.CoreQueueConfiguration; +import org.apache.activemq.core.config.DivertConfiguration; +import org.apache.activemq.core.config.impl.ConfigurationImpl; +import org.apache.activemq.core.deployers.Deployer; +import org.apache.activemq.core.deployers.DeploymentManager; +import org.apache.activemq.core.deployers.impl.AddressSettingsDeployer; +import org.apache.activemq.core.deployers.impl.BasicUserCredentialsDeployer; +import org.apache.activemq.core.deployers.impl.FileDeploymentManager; +import org.apache.activemq.core.deployers.impl.QueueDeployer; +import org.apache.activemq.core.deployers.impl.SecurityDeployer; +import org.apache.activemq.core.filter.Filter; +import org.apache.activemq.core.filter.impl.FilterImpl; +import org.apache.activemq.core.journal.IOCriticalErrorListener; +import org.apache.activemq.core.journal.JournalLoadInformation; +import org.apache.activemq.core.journal.SequentialFile; +import org.apache.activemq.core.journal.impl.AIOSequentialFileFactory; +import org.apache.activemq.core.journal.impl.SyncSpeedTest; +import org.apache.activemq.core.management.impl.ActiveMQServerControlImpl; +import org.apache.activemq.core.paging.PagingManager; +import org.apache.activemq.core.paging.cursor.PageSubscription; +import org.apache.activemq.core.paging.impl.PagingManagerImpl; +import org.apache.activemq.core.paging.impl.PagingStoreFactoryNIO; +import org.apache.activemq.core.persistence.GroupingInfo; +import org.apache.activemq.core.persistence.OperationContext; +import org.apache.activemq.core.persistence.QueueBindingInfo; +import org.apache.activemq.core.persistence.StorageManager; +import org.apache.activemq.core.persistence.config.PersistedAddressSetting; +import org.apache.activemq.core.persistence.config.PersistedRoles; +import org.apache.activemq.core.persistence.impl.PageCountPending; +import org.apache.activemq.core.persistence.impl.journal.JournalStorageManager; +import org.apache.activemq.core.persistence.impl.journal.OperationContextImpl; +import org.apache.activemq.core.persistence.impl.nullpm.NullStorageManager; +import org.apache.activemq.core.postoffice.Binding; +import org.apache.activemq.core.postoffice.PostOffice; +import org.apache.activemq.core.postoffice.QueueBinding; +import org.apache.activemq.core.postoffice.impl.DivertBinding; +import org.apache.activemq.core.postoffice.impl.LocalQueueBinding; +import org.apache.activemq.core.postoffice.impl.PostOfficeImpl; +import org.apache.activemq.core.remoting.server.RemotingService; +import org.apache.activemq.core.remoting.server.impl.RemotingServiceImpl; +import org.apache.activemq.core.replication.ReplicationManager; +import org.apache.activemq.core.security.CheckType; +import org.apache.activemq.core.security.Role; +import org.apache.activemq.core.security.SecurityStore; +import org.apache.activemq.core.security.impl.SecurityStoreImpl; +import org.apache.activemq.core.server.ActivateCallback; +import org.apache.activemq.core.server.ActiveMQServerLogger; +import org.apache.activemq.core.server.Bindable; +import org.apache.activemq.core.server.Divert; +import org.apache.activemq.core.server.ActiveMQComponent; +import org.apache.activemq.core.server.ActiveMQMessageBundle; +import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.core.server.JournalType; +import org.apache.activemq.core.server.LargeServerMessage; +import org.apache.activemq.core.server.MemoryManager; +import org.apache.activemq.core.server.NodeManager; +import org.apache.activemq.core.server.Queue; +import org.apache.activemq.core.server.QueueFactory; +import org.apache.activemq.core.server.ServerSession; +import org.apache.activemq.core.server.ServerSessionFactory; +import org.apache.activemq.core.server.cluster.BackupManager; +import org.apache.activemq.core.server.cluster.ClusterManager; +import org.apache.activemq.core.server.cluster.Transformer; +import org.apache.activemq.core.server.cluster.ha.HAPolicy; +import org.apache.activemq.core.server.group.GroupingHandler; +import org.apache.activemq.core.server.group.impl.GroupingHandlerConfiguration; +import org.apache.activemq.core.server.group.impl.LocalGroupingHandler; +import org.apache.activemq.core.server.group.impl.RemoteGroupingHandler; +import org.apache.activemq.core.server.management.ManagementService; +import org.apache.activemq.core.server.management.impl.ManagementServiceImpl; +import org.apache.activemq.core.settings.HierarchicalRepository; +import org.apache.activemq.core.settings.impl.AddressSettings; +import org.apache.activemq.core.settings.impl.HierarchicalObjectRepository; +import org.apache.activemq.core.transaction.ResourceManager; +import org.apache.activemq.core.transaction.impl.ResourceManagerImpl; +import org.apache.activemq.core.version.Version; +import org.apache.activemq.spi.core.protocol.ProtocolManagerFactory; +import org.apache.activemq.spi.core.protocol.RemotingConnection; +import org.apache.activemq.spi.core.protocol.SessionCallback; +import org.apache.activemq.spi.core.security.ActiveMQSecurityManager; +import org.apache.activemq.utils.ClassloadingUtil; +import org.apache.activemq.utils.ConcurrentHashSet; +import org.apache.activemq.utils.ExecutorFactory; +import org.apache.activemq.utils.ActiveMQThreadFactory; +import org.apache.activemq.utils.OrderedExecutorFactory; +import org.apache.activemq.utils.ReusableLatch; +import org.apache.activemq.utils.SecurityFormatter; +import org.apache.activemq.utils.VersionLoader; + +/** + * The ActiveMQ server implementation + * + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]>Andy Taylor</a> + * @author <a href="mailto:[email protected]>Martyn Taylor</a> + */ +public class ActiveMQServerImpl implements ActiveMQServer +{ + /** + * JMS Topics (which are outside of the scope of the core API) will require a dumb subscription + * with a dummy-filter at this current version as a way to keep its existence valid and TCK + * tests. That subscription needs an invalid filter, however paging needs to ignore any + * subscription with this filter. For that reason, this filter needs to be rejected on paging or + * any other component on the system, and just be ignored for any purpose It's declared here as + * this filter is considered a global ignore + */ + public static final String GENERIC_IGNORED_FILTER = "__HQX=-1"; + + private HAPolicy haPolicy; + + enum SERVER_STATE + { + /** + * start() has been called but components are not initialized. The whole point of this state, + * is to be in a state which is different from {@link SERVER_STATE#STARTED} and + * {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such as + * {@link #stop(boolean)} worked as intended. + */ + STARTING, + /** + * server is started. {@code server.isStarted()} returns {@code true}, and all assumptions + * about it hold. + */ + STARTED, + /** + * stop() was called but has not finished yet. Meant to avoids starting components while + * stop() is executing. + */ + STOPPING, + /** + * Stopped: either stop() has been called and has finished running, or start() has never been + * called. + */ + STOPPED; + } + + private volatile SERVER_STATE state = SERVER_STATE.STOPPED; + + private final Version version; + + private final ActiveMQSecurityManager securityManager; + + private final Configuration configuration; + + private final MBeanServer mbeanServer; + + private volatile SecurityStore securityStore; + + private final HierarchicalRepository<AddressSettings> addressSettingsRepository; + + private volatile QueueFactory queueFactory; + + private volatile PagingManager pagingManager; + + private volatile PostOffice postOffice; + + private volatile ExecutorService threadPool; + + private volatile ScheduledExecutorService scheduledPool; + + private volatile ExecutorFactory executorFactory; + + private final HierarchicalRepository<Set<Role>> securityRepository; + + private volatile ResourceManager resourceManager; + + private volatile ActiveMQServerControlImpl messagingServerControl; + + private volatile ClusterManager clusterManager; + + private volatile BackupManager backupManager; + + private volatile StorageManager storageManager; + + private volatile RemotingService remotingService; + + private final List<ProtocolManagerFactory> protocolManagerFactories = new ArrayList<>(); + + private volatile ManagementService managementService; + + private volatile ConnectorsService connectorsService; + + private MemoryManager memoryManager; + + private volatile DeploymentManager deploymentManager; + + private Deployer basicUserCredentialsDeployer; + private Deployer addressSettingsDeployer; + private Deployer queueDeployer; + private Deployer securityDeployer; + + private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>(); + + /** + * This class here has the same principle of CountDownLatch but you can reuse the counters. + * It's based on the same super classes of {@code CountDownLatch} + */ + private final ReusableLatch activationLatch = new ReusableLatch(0); + + private final Set<ActivateCallback> activateCallbacks = new ConcurrentHashSet<ActivateCallback>(); + + private volatile GroupingHandler groupingHandler; + + private NodeManager nodeManager; + + // Used to identify the server on tests... useful on debugging testcases + private String identity; + + private Thread backupActivationThread; + + private Activation activation; + + private Map<String, Object> activationParams = new HashMap<>(); + + private final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener(); + + private final ActiveMQServer parentServer; + + //todo think about moving this to the activation + private final List<SimpleString> scaledDownNodeIDs = new ArrayList<>(); + + private boolean threadPoolSupplied = false; + + private boolean scheduledPoolSupplied = false; + + private ServiceRegistry serviceRegistry; + + // Constructors + // --------------------------------------------------------------------------------- + + public ActiveMQServerImpl() + { + this(null, null, null); + } + + public ActiveMQServerImpl(final Configuration configuration) + { + this(configuration, null, null); + } + + public ActiveMQServerImpl(final Configuration configuration, ActiveMQServer parentServer) + { + this(configuration, null, null, parentServer); + } + + public ActiveMQServerImpl(final Configuration configuration, final MBeanServer mbeanServer) + { + this(configuration, mbeanServer, null); + } + + public ActiveMQServerImpl(final Configuration configuration, final ActiveMQSecurityManager securityManager) + { + this(configuration, null, securityManager); + } + + public ActiveMQServerImpl(Configuration configuration, + MBeanServer mbeanServer, + final ActiveMQSecurityManager securityManager) + { + this(configuration, mbeanServer, securityManager, null); + } + + public ActiveMQServerImpl(Configuration configuration, + MBeanServer mbeanServer, + final ActiveMQSecurityManager securityManager, + final ActiveMQServer parentServer) + { + this(configuration, mbeanServer, securityManager, parentServer, null); + } + + public ActiveMQServerImpl(Configuration configuration, + MBeanServer mbeanServer, + final ActiveMQSecurityManager securityManager, + final ActiveMQServer parentServer, + final ServiceRegistry serviceRegistry) + { + if (configuration == null) + { + configuration = new ConfigurationImpl(); + } + if (mbeanServer == null) + { + // Just use JVM mbean server + mbeanServer = ManagementFactory.getPlatformMBeanServer(); + } + + // We need to hard code the version information into a source file + + version = VersionLoader.getVersion(); + + this.configuration = configuration; + + this.mbeanServer = mbeanServer; + + this.securityManager = securityManager; + + addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>(); + + addressSettingsRepository.setDefault(new AddressSettings()); + + securityRepository = new HierarchicalObjectRepository<Set<Role>>(); + + securityRepository.setDefault(new HashSet<Role>()); + + this.parentServer = parentServer; + + this.serviceRegistry = serviceRegistry == null ? new ServiceRegistry() : serviceRegistry; + } + + // life-cycle methods + // ---------------------------------------------------------------- + + /* + * Can be overridden for tests + */ + protected NodeManager createNodeManager(final String directory, boolean replicatingBackup) + { + NodeManager manager; + if (!configuration.isPersistenceEnabled()) + { + manager = new InVMNodeManager(replicatingBackup); + } + else if (configuration.getJournalType() == JournalType.ASYNCIO && AsynchronousFileImpl.isLoaded()) + { + manager = new AIOFileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout()); + } + else + { + manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout()); + } + return manager; + } + + public final synchronized void start() throws Exception + { + if (state != SERVER_STATE.STOPPED) + { + ActiveMQServerLogger.LOGGER.debug("Server already started!"); + return; + } + + state = SERVER_STATE.STARTING; + + if (haPolicy == null) + { + haPolicy = ConfigurationUtils.getHAPolicy(configuration.getHAPolicyConfiguration()); + } + + activationLatch.setCount(1); + + ActiveMQServerLogger.LOGGER.debug("Starting server " + this); + + OperationContextImpl.clearContext(); + + try + { + checkJournalDirectory(); + + nodeManager = createNodeManager(configuration.getJournalDirectory(), false); + + nodeManager.start(); + + ActiveMQServerLogger.LOGGER.serverStarting((haPolicy.isBackup() ? "backup" : "live"), configuration); + + if (configuration.isRunSyncSpeedTest()) + { + SyncSpeedTest test = new SyncSpeedTest(); + + test.run(); + } + + final boolean wasLive = !haPolicy.isBackup(); + if (!haPolicy.isBackup()) + { + activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO); + + activation.run(); + } + // The activation on fail-back may change the value of isBackup, for that reason we are + // checking again here + if (haPolicy.isBackup()) + { + if (haPolicy.isSharedStore()) + { + activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO); + } + else + { + activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO); + } + + backupActivationThread = new Thread(activation, ActiveMQMessageBundle.BUNDLE.activationForServer(this)); + backupActivationThread.start(); + } + else + { + state = SERVER_STATE.STARTED; + ActiveMQServerLogger.LOGGER.serverStarted(getVersion().getFullVersion(), nodeManager.getNodeId(), + identity != null ? identity : ""); + } + // start connector service + connectorsService = new ConnectorsService(configuration, storageManager, scheduledPool, postOffice, serviceRegistry); + connectorsService.start(); + } + finally + { + // this avoids embedded applications using dirty contexts from startup + OperationContextImpl.clearContext(); + } + } + + @Override + protected final void finalize() throws Throwable + { + if (state != SERVER_STATE.STOPPED) + { + ActiveMQServerLogger.LOGGER.serverFinalisedWIthoutBeingSTopped(); + + stop(); + } + + super.finalize(); + } + + public void setState(SERVER_STATE state) + { + this.state = state; + } + + public SERVER_STATE getState() + { + return state; + } + + public void interrupBackupThread(NodeManager nodeManagerInUse) throws InterruptedException + { + long timeout = 30000; + + long start = System.currentTimeMillis(); + + while (backupActivationThread.isAlive() && System.currentTimeMillis() - start < timeout) + { + if (nodeManagerInUse != null) + { + nodeManagerInUse.interrupt(); + } + + backupActivationThread.interrupt(); + + backupActivationThread.join(1000); + + } + + if (System.currentTimeMillis() - start >= timeout) + { + threadDump("Timed out waiting for backup activation to exit"); + } + } + + public void resetNodeManager() throws Exception + { + nodeManager.stop(); + nodeManager = + createNodeManager(configuration.getJournalDirectory(), true); + } + + public Activation getActivation() + { + return activation; + } + + @Override + public HAPolicy getHAPolicy() + { + return haPolicy; + } + + @Override + public void setHAPolicy(HAPolicy haPolicy) + { + this.haPolicy = haPolicy; + } + + public ExecutorService getThreadPool() + { + return threadPool; + } + + public void setActivation(SharedNothingLiveActivation activation) + { + this.activation = activation; + } + /** + * Stops the server in a different thread. + */ + public final void stopTheServer(final boolean criticalIOError) + { + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(new Runnable() + { + @Override + public void run() + { + try + { + stop(false, criticalIOError, false); + } + catch (Exception e) + { + ActiveMQServerLogger.LOGGER.errorStoppingServer(e); + } + } + }); + } + + public final void stop() throws Exception + { + stop(false); + } + + public void addActivationParam(String key, Object val) + { + activationParams.put(key, val); + } + @Override + public boolean isAddressBound(String address) throws Exception + { + return postOffice.isAddressBound(SimpleString.toSimpleString(address)); + } + + public void threadDump(final String reason) + { + StringWriter str = new StringWriter(); + PrintWriter out = new PrintWriter(str); + + Map<Thread, StackTraceElement[]> stackTrace = Thread.getAllStackTraces(); + + out.println(ActiveMQMessageBundle.BUNDLE.generatingThreadDump(reason)); + out.println("*******************************************************************************"); + + for (Map.Entry<Thread, StackTraceElement[]> el : stackTrace.entrySet()) + { + out.println("==============================================================================="); + out.println(ActiveMQMessageBundle.BUNDLE.threadDump(el.getKey(), el.getKey().getName(), el.getKey().getId(), el.getKey().getThreadGroup())); + out.println(); + for (StackTraceElement traceEl : el.getValue()) + { + out.println(traceEl); + } + } + + out.println("==============================================================================="); + out.println(ActiveMQMessageBundle.BUNDLE.endThreadDump()); + out.println("*******************************************************************************"); + + ActiveMQServerLogger.LOGGER.warn(str.toString()); + } + + public final void stop(boolean failoverOnServerShutdown) throws Exception + { + stop(failoverOnServerShutdown, false, false); + } + + /** + * Stops the server + * @param criticalIOError whether we have encountered an IO error with the journal etc + */ + void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting) throws Exception + { + synchronized (this) + { + if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) + { + return; + } + state = SERVER_STATE.STOPPING; + + activation.sendLiveIsStopping(); + + stopComponent(connectorsService); + + // we stop the groupingHandler before we stop the cluster manager so binding mappings + // aren't removed in case of failover + if (groupingHandler != null) + { + managementService.removeNotificationListener(groupingHandler); + groupingHandler.stop(); + } + stopComponent(clusterManager); + + freezeConnections(); + } + + activation.postConnectionFreeze(); + + closeAllServerSessions(criticalIOError); + + // ************************************************************************************************************* + // There's no need to sync this part of the method, since the state stopped | stopping is checked within the sync + // + // we can't synchronized the whole method here as that would cause a deadlock + // so stop is checking for stopped || stopping inside the lock + // which will be already enough to guarantee that no other thread will be accessing this method here. + // + // ************************************************************************************************************* + + if (storageManager != null) + storageManager.clearContext(); + + //before we stop any components deactivate any callbacks + callDeActiveCallbacks(); + + // Stop the deployers + if (configuration.isFileDeploymentEnabled()) + { + stopComponent(basicUserCredentialsDeployer); + stopComponent(addressSettingsDeployer); + stopComponent(queueDeployer); + stopComponent(securityDeployer); + stopComponent(deploymentManager); + } + + stopComponent(backupManager); + activation.preStorageClose(); + stopComponent(pagingManager); + + if (storageManager != null) + storageManager.stop(criticalIOError); + + // We stop remotingService before otherwise we may lock the system in case of a critical IO + // error shutdown + if (remotingService != null) + remotingService.stop(criticalIOError); + + // Stop the management service after the remoting service to ensure all acceptors are deregistered with JMX + if (managementService != null) + managementService.unregisterServer(); + stopComponent(managementService); + + stopComponent(securityManager); + stopComponent(resourceManager); + + stopComponent(postOffice); + + if (scheduledPool != null && !scheduledPoolSupplied) + { + // we just interrupt all running tasks, these are supposed to be pings and the like. + scheduledPool.shutdownNow(); + } + + stopComponent(memoryManager); + + if (threadPool != null && !threadPoolSupplied) + { + threadPool.shutdown(); + try + { + if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) + { + ActiveMQServerLogger.LOGGER.timedOutStoppingThreadpool(threadPool); + for (Runnable r : threadPool.shutdownNow()) + { + ActiveMQServerLogger.LOGGER.debug("Cancelled the execution of " + r); + } + } + } + catch (InterruptedException e) + { + // Ignore + } + } + + if (!threadPoolSupplied) threadPool = null; + if (!scheduledPoolSupplied) scheduledPool = null; + + if (securityStore != null) + securityStore.stop(); + + pagingManager = null; + securityStore = null; + resourceManager = null; + postOffice = null; + queueFactory = null; + resourceManager = null; + messagingServerControl = null; + memoryManager = null; + backupManager = null; + + sessions.clear(); + + state = SERVER_STATE.STOPPED; + + activationLatch.setCount(1); + + // to display in the log message + SimpleString tempNodeID = getNodeID(); + if (activation != null) + { + activation.close(failoverOnServerShutdown, restarting); + } + if (backupActivationThread != null) + { + + backupActivationThread.join(30000); + if (backupActivationThread.isAlive()) + { + ActiveMQServerLogger.LOGGER.backupActivationDidntFinish(this); + backupActivationThread.interrupt(); + } + } + + stopComponent(nodeManager); + + nodeManager = null; + + addressSettingsRepository.clearListeners(); + + addressSettingsRepository.clearCache(); + + scaledDownNodeIDs.clear(); + + if (identity != null) + { + ActiveMQServerLogger.LOGGER.serverStopped("identity=" + identity + ",version=" + getVersion().getFullVersion(), + tempNodeID); + } + else + { + ActiveMQServerLogger.LOGGER.serverStopped(getVersion().getFullVersion(), tempNodeID); + } + } + + + + public boolean checkLiveIsNotColocated(String nodeId) + { + if (parentServer == null) + { + return true; + } + else + { + return !parentServer.getNodeID().toString().equals(nodeId); + } + } + + /** + * Freeze all connections. + * <p/> + * If replicating, avoid freezing the replication connection. Helper method for + * {@link #stop(boolean, boolean, boolean)}. + */ + private void freezeConnections() + { + activation.freezeConnections(remotingService); + + // after disconnecting all the clients close all the server sessions so any messages in delivery will be cancelled back to the queue + for (ServerSession serverSession : sessions.values()) + { + try + { + serverSession.close(true); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + } + + /** + * We close all the exception in an attempt to let any pending IO to finish to avoid scenarios + * where the send or ACK got to disk but the response didn't get to the client It may still be + * possible to have this scenario on a real failure (without the use of XA) But at least we will + * do our best to avoid it on regular shutdowns + */ + private void closeAllServerSessions(final boolean criticalIOError) + { + if (state != SERVER_STATE.STOPPING) + { + return; + } + for (ServerSession session : sessions.values()) + { + try + { + session.close(true); + } + catch (Exception e) + { + // If anything went wrong with closing sessions.. we should ignore it + // such as transactions.. etc. + ActiveMQServerLogger.LOGGER.errorClosingSessionsWhileStoppingServer(e); + } + } + if (!criticalIOError) + { + for (ServerSession session : sessions.values()) + { + try + { + session.waitContextCompletion(); + } + catch (Exception e) + { + ActiveMQServerLogger.LOGGER.errorClosingSessionsWhileStoppingServer(e); + } + } + } + + } + + static void stopComponent(ActiveMQComponent component) throws Exception + { + if (component != null) + component.stop(); + } + + // ActiveMQServer implementation + // ----------------------------------------------------------- + + public String describe() + { + StringWriter str = new StringWriter(); + PrintWriter out = new PrintWriter(str); + + out.println(ActiveMQMessageBundle.BUNDLE.serverDescribe(identity, getClusterManager().describe())); + + return str.toString(); + } + + public String destroyConnectionWithSessionMetadata(String metaKey, String parameterValue) throws Exception + { + StringBuffer operationsExecuted = new StringBuffer(); + + try + { + operationsExecuted.append("**************************************************************************************************\n"); + operationsExecuted.append(ActiveMQMessageBundle.BUNDLE.destroyConnectionWithSessionMetadataHeader(metaKey, parameterValue) + "\n"); + + Set<ServerSession> allSessions = getSessions(); + + ServerSession sessionFound = null; + for (ServerSession session : allSessions) + { + try + { + String value = session.getMetaData(metaKey); + if (value != null && value.equals(parameterValue)) + { + sessionFound = session; + operationsExecuted.append(ActiveMQMessageBundle.BUNDLE.destroyConnectionWithSessionMetadataClosingConnection(sessionFound.toString()) + "\n"); + RemotingConnection conn = session.getRemotingConnection(); + if (conn != null) + { + conn.fail(ActiveMQMessageBundle.BUNDLE.destroyConnectionWithSessionMetadataSendException(metaKey, parameterValue)); + } + session.close(true); + sessions.remove(session.getName()); + } + } + catch (Throwable e) + { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } + } + + if (sessionFound == null) + { + operationsExecuted.append(ActiveMQMessageBundle.BUNDLE.destroyConnectionWithSessionMetadataNoSessionFound(metaKey, parameterValue) + "\n"); + } + + operationsExecuted.append("**************************************************************************************************"); + + return operationsExecuted.toString(); + } + finally + { + // This operation is critical for the knowledge of the admin, so we need to add info logs for later knowledge + ActiveMQServerLogger.LOGGER.info(operationsExecuted.toString()); + } + + } + + public void setIdentity(String identity) + { + this.identity = identity; + } + + public String getIdentity() + { + return identity; + } + + public ScheduledExecutorService getScheduledPool() + { + return scheduledPool; + } + + public Configuration getConfiguration() + { + return configuration; + } + + public PagingManager getPagingManager() + { + return pagingManager; + } + + public RemotingService getRemotingService() + { + return remotingService; + } + + public StorageManager getStorageManager() + { + return storageManager; + } + + public ActiveMQSecurityManager getSecurityManager() + { + return securityManager; + } + + public ManagementService getManagementService() + { + return managementService; + } + + public HierarchicalRepository<Set<Role>> getSecurityRepository() + { + return securityRepository; + } + + public NodeManager getNodeManager() + { + return nodeManager; + } + + public HierarchicalRepository<AddressSettings> getAddressSettingsRepository() + { + return addressSettingsRepository; + } + + public DeploymentManager getDeploymentManager() + { + return deploymentManager; + } + + public ResourceManager getResourceManager() + { + return resourceManager; + } + + public Version getVersion() + { + return version; + } + + public boolean isStarted() + { + return state == SERVER_STATE.STARTED; + } + + public ClusterManager getClusterManager() + { + return clusterManager; + } + + public BackupManager getBackupManager() + { + return backupManager; + } + + public ServerSession createSession(final String name, + final String username, + final String password, + final int minLargeMessageSize, + final RemotingConnection connection, + final boolean autoCommitSends, + final boolean autoCommitAcks, + final boolean preAcknowledge, + final boolean xa, + final String defaultAddress, + final SessionCallback callback, + final ServerSessionFactory sessionFactory) throws Exception + { + + if (securityStore != null) + { + securityStore.authenticate(username, password); + } + final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor()); + final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, sessionFactory); + + sessions.put(name, session); + + return session; + } + + protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory) throws Exception + { + if (sessionFactory == null) + { + return new ServerSessionImpl(name, + username, + password, + minLargeMessageSize, + autoCommitSends, + autoCommitAcks, + preAcknowledge, + configuration.isPersistDeliveryCountBeforeDelivery(), + xa, + connection, + storageManager, + postOffice, + resourceManager, + securityStore, + managementService, + this, + configuration.getManagementAddress(), + defaultAddress == null ? null + : new SimpleString(defaultAddress), + callback, + context); + } + else + { + return sessionFactory.createCoreSession(name, + username, + password, + minLargeMessageSize, + autoCommitSends, + autoCommitAcks, + preAcknowledge, + configuration.isPersistDeliveryCountBeforeDelivery(), + xa, + connection, + storageManager, + postOffice, + resourceManager, + securityStore, + managementService, + this, + configuration.getManagementAddress(), + defaultAddress == null ? null + : new SimpleString(defaultAddress), + callback, + context); + } + } + + @Override + public SecurityStore getSecurityStore() + { + return securityStore; + } + + public void removeSession(final String name) throws Exception + { + sessions.remove(name); + } + + public ServerSession lookupSession(String key, String value) + { + // getSessions is called here in a try to minimize locking the Server while this check is being done + Set<ServerSession> allSessions = getSessions(); + + for (ServerSession session : allSessions) + { + String metaValue = session.getMetaData(key); + if (metaValue != null && metaValue.equals(value)) + { + return session; + } + } + + return null; + } + + public synchronized List<ServerSession> getSessions(final String connectionID) + { + Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet(); + List<ServerSession> matchingSessions = new ArrayList<ServerSession>(); + for (Entry<String, ServerSession> sessionEntry : sessionEntries) + { + ServerSession serverSession = sessionEntry.getValue(); + if (serverSession.getConnectionID().toString().equals(connectionID)) + { + matchingSessions.add(serverSession); + } + } + return matchingSessions; + } + + public synchronized Set<ServerSession> getSessions() + { + return new HashSet<ServerSession>(sessions.values()); + } + + @Override + public boolean isActive() + { + return activationLatch.getCount() < 1; + } + + @Override + public boolean waitForActivation(long timeout, TimeUnit unit) throws InterruptedException + { + return activationLatch.await(timeout, unit); + } + + + public ActiveMQServerControlImpl getActiveMQServerControl() + { + return messagingServerControl; + } + + public int getConnectionCount() + { + return remotingService.getConnections().size(); + } + + public PostOffice getPostOffice() + { + return postOffice; + } + + public QueueFactory getQueueFactory() + { + return queueFactory; + } + + public SimpleString getNodeID() + { + return nodeManager == null ? null : nodeManager.getNodeId(); + } + + public Queue createQueue(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable, + final boolean temporary) throws Exception + { + return createQueue(address, queueName, filterString, durable, temporary, false, false); + } + + + /** + * Creates a transient queue. A queue that will exist as long as there are consumers. + * The queue will be deleted as soon as all the consumers are removed. + * <p/> + * Notice: the queue won't be deleted until the first consumer arrives. + * + * @param address + * @param name + * @param filterString + * @param durable + * @throws Exception + */ + public void createSharedQueue(final SimpleString address, + final SimpleString name, + final SimpleString filterString, + boolean durable) throws Exception + { + Queue queue = createQueue(address, name, filterString, durable, !durable, true, !durable); + + if (!queue.getAddress().equals(address)) + { + throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name); + } + + if (filterString != null && (queue.getFilter() == null || !queue.getFilter().getFilterString().equals(filterString)) || + filterString == null && queue.getFilter() != null) + { + throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentFilter(name); + } + + if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) + { + ActiveMQServerLogger.LOGGER.debug("Transient Queue " + name + " created on address " + name + + " with filter=" + filterString); + } + + } + + + public Queue locateQueue(SimpleString queueName) throws Exception + { + Binding binding = postOffice.getBinding(queueName); + + if (binding == null) + { + return null; + } + + Bindable queue = binding.getBindable(); + + if (!(queue instanceof Queue)) + { + throw new IllegalStateException("locateQueue should only be used to locate queues"); + } + + return (Queue) binding.getBindable(); + } + + public Queue deployQueue(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable, + final boolean temporary) throws Exception + { + ActiveMQServerLogger.LOGGER.deployQueue(queueName); + + return createQueue(address, queueName, filterString, durable, temporary, true, false); + } + + public void destroyQueue(final SimpleString queueName) throws Exception + { + // The session is passed as an argument to verify if the user has authorization to delete the queue + // in some cases (such as temporary queues) this should happen regardless of the authorization + // since that will only happen during a session close, which will be used to cleanup on temporary queues + destroyQueue(queueName, null, true); + } + + public void destroyQueue(final SimpleString queueName, final ServerSession session) throws Exception + { + destroyQueue(queueName, session, true); + } + + public void destroyQueue(final SimpleString queueName, final ServerSession session, final boolean checkConsumerCount) throws Exception + { + destroyQueue(queueName, session, checkConsumerCount, false); + } + + public void destroyQueue(final SimpleString queueName, final ServerSession session, final boolean checkConsumerCount, final boolean removeConsumers) throws Exception + { + addressSettingsRepository.clearCache(); + + Binding binding = postOffice.getBinding(queueName); + + if (binding == null) + { + throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(queueName); + } + + Queue queue = (Queue) binding.getBindable(); + + // This check is only valid if checkConsumerCount == true + if (checkConsumerCount && queue.getConsumerCount() != 0) + { + throw ActiveMQMessageBundle.BUNDLE.cannotDeleteQueue(queue.getName(), queueName, binding.getClass().getName()); + } + + if (session != null) + { + + if (queue.isDurable()) + { + // make sure the user has privileges to delete this queue + securityStore.check(binding.getAddress(), CheckType.DELETE_DURABLE_QUEUE, session); + } + else + { + securityStore.check(binding.getAddress(), CheckType.DELETE_NON_DURABLE_QUEUE, session); + } + } + + queue.deleteQueue(removeConsumers); + } + + + public void registerActivateCallback(final ActivateCallback callback) + { + activateCallbacks.add(callback); + } + + public void unregisterActivateCallback(final ActivateCallback callback) + { + activateCallbacks.remove(callback); + } + + public ExecutorFactory getExecutorFactory() + { + return executorFactory; + } + + public void setGroupingHandler(final GroupingHandler groupingHandler) + { + if (this.groupingHandler != null && managementService != null) + { + // Removing old groupNotification + managementService.removeNotificationListener(this.groupingHandler); + } + this.groupingHandler = groupingHandler; + if (managementService != null) + { + managementService.addNotificationListener(this.groupingHandler); + } + + } + + public GroupingHandler getGroupingHandler() + { + return groupingHandler; + } + + public ReplicationManager getReplicationManager() + { + return activation.getReplicationManager(); + } + + public ConnectorsService getConnectorsService() + { + return connectorsService; + } + + public void deployDivert(DivertConfiguration config) throws Exception + { + if (config.getName() == null) + { + ActiveMQServerLogger.LOGGER.divertWithNoName(); + + return; + } + + if (config.getAddress() == null) + { + ActiveMQServerLogger.LOGGER.divertWithNoAddress(); + + return; + } + + if (config.getForwardingAddress() == null) + { + ActiveMQServerLogger.LOGGER.divertWithNoForwardingAddress(); + + return; + } + + SimpleString sName = new SimpleString(config.getName()); + + if (postOffice.getBinding(sName) != null) + { + ActiveMQServerLogger.LOGGER.divertBindingNotExists(sName); + + return; + } + + SimpleString sAddress = new SimpleString(config.getAddress()); + + Transformer transformer = instantiateTransformer(config.getTransformerClassName()); + + Filter filter = FilterImpl.createFilter(config.getFilterString()); + + Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()), + sName, + new SimpleString(config.getRoutingName()), + config.isExclusive(), + filter, + transformer, + postOffice, + storageManager); + + Binding binding = new DivertBinding(storageManager.generateID(), sAddress, divert); + + postOffice.addBinding(binding); + + managementService.registerDivert(divert, config); + } + + public void destroyDivert(SimpleString name) throws Exception + { + Binding binding = postOffice.getBinding(name); + if (binding == null) + { + throw ActiveMQMessageBundle.BUNDLE.noBindingForDivert(name); + } + if (!(binding instanceof DivertBinding)) + { + throw ActiveMQMessageBundle.BUNDLE.bindingNotDivert(name); + } + + postOffice.removeBinding(name, null); + } + + public void deployBridge(BridgeConfiguration config) throws Exception + { + if (clusterManager != null) + { + clusterManager.deployBridge(config); + } + } + + public void destroyBridge(String name) throws Exception + { + if (clusterManager != null) + { + clusterManager.destroyBridge(name); + } + } + + public ServerSession getSessionByID(String sessionName) + { + return sessions.get(sessionName); + } + + // PUBLIC ------- + + @Override + public String toString() + { + if (identity != null) + { + return "ActiveMQServerImpl::" + identity; + } + return "ActiveMQServerImpl::" + (nodeManager != null ? "serverUUID=" + nodeManager.getUUID() : ""); + } + + /** + * For tests only, don't use this method as it's not part of the API + * + * @param factory + */ + public void replaceQueueFactory(QueueFactory factory) + { + this.queueFactory = factory; + } + + + private PagingManager createPagingManager() + { + + return new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingDirectory(), + configuration.getJournalBufferTimeout_NIO(), + scheduledPool, + executorFactory, + configuration.isJournalSyncNonTransactional(), + shutdownOnCriticalIO), + addressSettingsRepository); + } + + /** + * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance) + */ + private StorageManager createStorageManager() + { + if (configuration.isPersistenceEnabled()) + { + return new JournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO); + } + return new NullStorageManager(); + } + + private void callActivateCallbacks() + { + for (ActivateCallback callback : activateCallbacks) + { + callback.activated(); + } + } + + private void callPreActiveCallbacks() + { + for (ActivateCallback callback : activateCallbacks) + { + callback.preActivate(); + } + } + + private void callDeActiveCallbacks() + { + for (ActivateCallback callback : activateCallbacks) + { + try + { + callback.deActivate(); + } + catch (Throwable e) + { + // https://bugzilla.redhat.com/show_bug.cgi?id=1009530: + // we won't interrupt the shutdown sequence because of a failed callback here + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + } + } + } + + private void callActivationCompleteCallbacks() + { + for (ActivateCallback callback : activateCallbacks) + { + callback.activationComplete(); + } + } + + /** + * Sets up ActiveMQ Executor Services. + */ + private void initializeExecutorServices() + { + /* We check to see if a Thread Pool is supplied in the InjectedObjectRegistry. If so we created a new Ordered + * Executor based on the provided Thread pool. Otherwise we create a new ThreadPool. + */ + if (serviceRegistry.getExecutorService() == null) + { + ThreadFactory tFactory = new ActiveMQThreadFactory("ActiveMQ-server-" + this.toString(), false, getThisClassLoader()); + if (configuration.getThreadPoolMaxSize() == -1) + { + threadPool = Executors.newCachedThreadPool(tFactory); + } + else + { + threadPool = Executors.newFixedThreadPool(configuration.getThreadPoolMaxSize(), tFactory); + } + } + else + { + threadPool = serviceRegistry.getExecutorService(); + this.threadPoolSupplied = true; + } + this.executorFactory = new OrderedExecutorFactory(threadPool); + + /* We check to see if a Scheduled Executor Service is provided in the InjectedObjectRegistry. If so we use this + * Scheduled ExecutorService otherwise we create a new one. + */ + if (serviceRegistry.getScheduledExecutorService() == null) + { + ThreadFactory tFactory = new ActiveMQThreadFactory("ActiveMQ-scheduled-threads", false, getThisClassLoader()); + scheduledPool = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), tFactory); + } + else + { + this.scheduledPoolSupplied = true; + this.scheduledPool = serviceRegistry.getScheduledExecutorService(); + } + } + + public ServiceRegistry getServiceRegistry() + { + return serviceRegistry; + } + + /** + * Starts everything apart from RemotingService and loading the data. + * <p/> + * After optional intermediary steps, Part 1 is meant to be followed by part 2 + * {@link #initialisePart2(boolean)}. + * @param scalingDown + */ + synchronized boolean initialisePart1(boolean scalingDown) throws Exception + { + if (state == SERVER_STATE.STOPPED) + return false; + + // Create the pools - we have two pools - one for non scheduled - and another for scheduled + initializeExecutorServices(); + + if (configuration.getJournalType() == JournalType.ASYNCIO && !AIOSequentialFileFactory.isSupported()) + { + ActiveMQServerLogger.LOGGER.switchingNIO(); + configuration.setJournalType(JournalType.NIO); + } + + managementService = new ManagementServiceImpl(mbeanServer, configuration); + + if (configuration.getMemoryMeasureInterval() != -1) + { + memoryManager = new MemoryManager(configuration.getMemoryWarningThreshold(), + configuration.getMemoryMeasureInterval()); + + memoryManager.start(); + } + + // Create the hard-wired components + + if (configuration.isFileDeploymentEnabled()) + { + deploymentManager = new FileDeploymentManager(configuration.getFileDeployerScanPeriod()); + } + + callPreActiveCallbacks(); + + // startReplication(); + + storageManager = createStorageManager(); + + if (ActiveMQDefaultConfiguration.getDefaultClusterUser().equals(configuration.getClusterUser()) && ActiveMQDefaultConfiguration.getDefaultClusterPassword().equals(configuration.getClusterPassword())) + { + ActiveMQServerLogger.LOGGER.clusterSecurityRisk(); + } + + securityStore = new SecurityStoreImpl(securityRepository, + securityManager, + configuration.getSecurityInvalidationInterval(), + configuration.isSecurityEnabled(), + configuration.getClusterUser(), + configuration.getClusterPassword(), + managementService); + + queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool, addressSettingsRepository, storageManager); + + pagingManager = createPagingManager(); + + resourceManager = new ResourceManagerImpl((int) (configuration.getTransactionTimeout() / 1000), + configuration.getTransactionTimeoutScanPeriod(), + scheduledPool); + postOffice = new PostOfficeImpl(this, + storageManager, + pagingManager, + queueFactory, + managementService, + configuration.getMessageExpiryScanPeriod(), + configuration.getMessageExpiryThreadPriority(), + configuration.isWildcardRoutingEnabled(), + configuration.getIDCacheSize(), + configuration.isPersistIDCache(), + addressSettingsRepository); + + // This can't be created until node id is set + clusterManager = + new ClusterManager(executorFactory, this, postOffice, scheduledPool, managementService, configuration, + nodeManager, haPolicy.isBackup()); + + backupManager = new BackupManager(this, executorFactory, scheduledPool, nodeManager, configuration, clusterManager); + + clusterManager.deploy(); + + remotingService = new RemotingServiceImpl(clusterManager, + configuration, + this, + managementService, + scheduledPool, + protocolManagerFactories, + executorFactory.getExecutor(), + serviceRegistry); + + messagingServerControl = managementService.registerServer(postOffice, + storageManager, + configuration, + addressSettingsRepository, + securityRepository, + resourceManager, + remotingService, + this, + queueFactory, + scheduledPool, + pagingManager, + haPolicy.isBackup()); + + // Address settings need to deployed initially, since they're require on paging manager.start() + + if (!scalingDown) + { + if (configuration.isFileDeploymentEnabled()) + { + addressSettingsDeployer = new AddressSettingsDeployer(deploymentManager, addressSettingsRepository); + + addressSettingsDeployer.start(); + } + + deployAddressSettingsFromConfiguration(); + } + + storageManager.start(); + + + if (securityManager != null) + { + securityManager.start(); + } + + postOffice.start(); + + pagingManager.start(); + + managementService.start(); + + resourceManager.start(); + + // Deploy all security related config + if (configuration.isFileDeploymentEnabled()) + { + basicUserCredentialsDeployer = new BasicUserCredentialsDeployer(deploymentManager, securityManager); + + basicUserCredentialsDeployer.start(); + + if (securityManager != null) + { + securityDeployer = new SecurityDeployer(deploymentManager, securityRepository); + + securityDeployer.start(); + } + } + + deploySecurityFromConfiguration(); + + deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration()); + + return true; + } + + /* + * Load the data, and start remoting service so clients can connect + */ + synchronized void initialisePart2(boolean scalingDown) throws Exception + { + // Load the journal and populate queues, transactions and caches in memory + + if (state == SERVER_STATE.STOPPED || state == SERVER_STATE.STOPPING) + { + return; + } + + pagingManager.reloadStores(); + + JournalLoadInformation[] journalInfo = loadJournals(); + + + final ServerInfo dumper = new ServerInfo(this, pagingManager); + + long dumpInfoInterval = configuration.getServerDumpInterval(); + + if (dumpInfoInterval > 0) + { + scheduledPool.scheduleWithFixedDelay(new Runnable() + { + public void run() + { + ActiveMQServerLogger.LOGGER.dumpServerInfo(dumper.dump()); + } + }, 0, dumpInfoInterval, TimeUnit.MILLISECONDS); + } + + // Deploy the rest of the stuff + + // Deploy any predefined queues + if (configuration.isFileDeploymentEnabled()) + { + queueDeployer = new QueueDeployer(deploymentManager, this); + + queueDeployer.start(); + } + else + { + deployQueuesFromConfiguration(); + } + + + // We need to call this here, this gives any dependent server a chance to deploy its own addresses + // this needs to be done before clustering is fully activated + callActivateCallbacks(); + + if (!scalingDown) + { + // Deploy any pre-defined diverts + deployDiverts(); + + if (groupingHandler != null) + { + groupingHandler.start(); + } + + // We do this at the end - we don't want things like MDBs or other connections connecting to a backup server until + // it is activated + + if (groupingHandler != null && groupingHandler instanceof LocalGroupingHandler) + { + clusterManager.start(); + + groupingHandler.awaitBindings(); + + remotingService.start(); + } + else + { + remotingService.start(); + + clusterManager.start(); + } + + if (nodeManager.getNodeId() == null) + { + throw ActiveMQMessageBundle.BUNDLE.nodeIdNull(); + } + activationLatch.countDown(); + + // We can only do this after everything is started otherwise we may get nasty races with expired messages + postOffice.startExpiryScanner(); + } + else + { + activationLatch.countDown(); + } + + callActivationCompleteCallbacks(); + } + + private void deploySecurityFromConfiguration() + { + for (Map.Entry<String, Set<Role>> entry : configuration.getSecurityRoles().entrySet()) + { + securityRepository.addMatch(entry.getKey(), entry.getValue(), true); + } + } + + private void deployQueuesFromConfiguration() throws Exception + { + for (CoreQueueConfiguration config : configuration.getQueueConfigurations()) + { + deployQueue(SimpleString.toSimpleString(config.getAddress()), + SimpleString.toSimpleString(config.getName()), + SimpleString.toSimpleString(config.getFilterString()), + config.isDurable(), + false); + } + } + + private void deployAddressSettingsFromConfiguration() + { + for (Map.Entry<String, AddressSettings> entry : configuration.getAddressesSettings().entrySet()) + { + addressSettingsRepository.addMatch(entry.getKey(), entry.getValue(), true); + } + } + + private JournalLoadInformation[] loadJournals() throws Exception + { + JournalLoader journalLoader = activation.createJournalLoader(postOffice, + pagingManager, + storageManager, + queueFactory, + nodeManager, + managementService, + groupingHandler, + configuration, + parentServer); + + JournalLoadInformation[] journalInfo = new JournalLoadInformation[2]; + + List<QueueBindingInfo> queueBindingInfos = new ArrayList(); + + List<GroupingInfo> groupingInfos = new ArrayList(); + + journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos, groupingInfos); + + recoverStoredConfigs(); + + Map<Long, QueueBindingInfo> queueBindingInfosMap = new HashMap(); + + + journalLoader.initQueues(queueBindingInfosMap, queueBindingInfos); + + journalLoader.handleGroupingBindings(groupingInfos); + + Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>(); + + HashSet<Pair<Long, Long>> pendingLargeMessages = new HashSet<Pair<Long, Long>>(); + + List<PageCountPending> pendingNonTXPageCounter = new LinkedList<PageCountPending>(); + + journalInfo[1] = storageManager.loadMessageJournal(postOffice, + pagingManager, + resourceManager, + queueBindingInfosMap, + duplicateIDMap, + pendingLargeMessages, + pendingNonTXPageCounter, + journalLoader); + + journalLoader.handleDuplicateIds(duplicateIDMap); + + for (Pair<Long, Long> msgToDelete : pendingLargeMessages) + { + ActiveMQServerLogger.LOGGER.deletingPendingMessage(msgToDelete); + LargeServerMessage msg = storageManager.createLargeMessage(); + msg.setMessageID(msgToDelete.getB()); + msg.setPendingRecordID(msgToDelete.getA()); + msg.setDurable(true); + msg.deleteFile(); + } + + if (pendingNonTXPageCounter.size() != 0) + { + try + { + journalLoader.recoverPendingPageCounters(pendingNonTXPageCounter); + } + catch (Throwable e) + { + ActiveMQServerLogger.LOGGER.errorRecoveringPageCounter(e); + } + } + + journalLoader.cleanUp(); + + return journalInfo; + } + + + /** + * @throws Exception + */ + private void recoverStoredConfigs() throws Exception + { + List<PersistedAddressSetting> adsettings = storageManager.recoverAddressSettings(); + for (PersistedAddressSetting set : adsettings) + { + addressSettingsRepository.addMatch(set.getAddressMatch().toString(), set.getSetting()); + } + + List<PersistedRoles> roles = storageManager.recoverPersistedRoles(); + + for (PersistedRoles roleItem : roles) + { + Set<Role> setRoles = SecurityFormatter.createSecurity(roleItem.getSendRoles(), + roleItem.getConsumeRoles(), + roleItem.getCreateDurableQueueRoles(), + roleItem.getDeleteDurableQueueRoles(), + roleItem.getCreateNonDurableQueueRoles(), + roleItem.getDeleteNonDurableQueueRoles(), + roleItem.getManageRoles()); + + securityRepository.addMatch(roleItem.getAddressMatch().toString(), setRoles); + } + } + + private Queue createQueue(final SimpleString address, + final SimpleString queueName, + final SimpleString filterString, + final boolean durable, + final boolean temporary, + final boolean ignoreIfExists, + final boolean transientQueue) throws Exception + { + QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); + + if (binding != null) + { + if (ignoreIfExists) + { + return binding.getQueue(); + } + else + { + throw ActiveMQMessageBundle.BUNDLE.queueAlreadyExists(queueName); + } + } + + Filter filter = FilterImpl.createFilter(filterString); + + long txID = storageManager.generateID(); + long queueID = storageManager.generateID(); + + PageSubscription pageSubscription; + + if (filterString != null && filterString.toString().equals(GENERIC_IGNORED_FILTER)) + { + pageSubscription = null; + } + else + { + pageSubscription = pagingManager.getPageStore(address) + .getCursorProvider() + .createSubscription(queueID, filter, durable); + } + + final Queue queue = queueFactory.createQueue(queueID, + address, + queueName, + filter, + pageSubscription, + durable, + temporary); + + if (transientQueue) + { + queue.setConsumersRefCount(this); + } + + binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId()); + + if (durable) + { + storageManager.addQueueBinding(txID, binding); + } + + try + { + postOffice.addBinding(binding); + if (durable) + { + storageManager.commitBindings(txID); + } + } + catch (Exception e) + { + try + { + if (durable) + { + storageManager.rollbackBindings(txID); + } + if (queue != null) + { + queue.close(); + } + if (pageSubscription != null) + { + pageSubscription.destroy(); + } + } + catch (Throwable ignored) + { + ActiveMQServerLogger.LOGGER.debug(ignored.getMessage(), ignored); + } + throw e; + } + + + managementService.registerAddress(address); + managementService.registerQueue(queue, address, storageManager); + + return queue; + } + + private void deployDiverts() throws Exception + { + for (DivertConfiguration config : configuration.getDivertConfigurations()) + { + deployDivert(config); + } + } + + private void deployGroupingHandlerConfiguration(final GroupingHandlerConfiguration config) throws Exception + { + if (config != null) + { + GroupingHandler groupingHandler1; + if (config.getType() == GroupingHandlerConfiguration.TYPE.LOCAL) + { + groupingHandler1 = + new LocalGroupingHandler(executorFactory, + scheduledPool, + managementService, + config.getName(), + config.getAddress(), + getStorageManager(), + config.getTimeout(), + config.getGroupTimeout(), + config.getReaperPeriod()); + } + else + { + groupingHandler1 = + new RemoteGroupingHandler(executorFactory, managementService, + config.getName(), + config.getAddress(), + config.getTimeout(), + config.getGroupTimeout()); + } + + this.groupingHandler = groupingHandler1; + + managementService.addNotificationListener(groupingHandler1); + } + } + + private Transformer instantiateTransformer(final String transformerClassName) + { + Transformer transformer = null; + + if (transformerClassName != null) + { + transformer = (Transformer) instantiateInstance(transformerClassName); + } + + return transformer; + } + + private Object instantiateInstance(final String className) + { + return safeInitNewInstance(className); + } + + private static ClassLoader getThisClassLoader() + { + return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() + { + public ClassLoader run() + { + return ClientSessionFactoryImpl.class.getClassLoader(); + } + }); + + } + + /** + * Check if journal directory exists or create it (if configured to do so) + */ + void checkJournalDirectory() + { + File journalDir = new File(configuration.getJournalDirectory()); + + if (!journalDir.exists()) + { + if (configuration.isCreateJournalDir()) + { + journalDir.mkdirs(); + } + else + { + throw ActiveMQMessageBundle.BUNDLE.cannotCreateDir(journalDir.getAbsolutePath()); + } + } + } + + + // Inner classes + // -------------------------------------------------------------------------------- + + + + public final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener + { + boolean failedAlready = false; + + public synchronized void onIOException(Exception cause, String message, SequentialFile file) + { + if (!failedAlready) + { + failedAlready = true; + + ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause); + + stopTheServer(true); + } + } + } + + + /** + * This seems duplicate code all over the place, but for security reasons we can't let something like this to be open in a + * utility class, as it would be a door to load anything you like in a safe VM. + * For that reason any class trying to do a privileged block should do with the AccessController directly. + */ + private static Object safeInitNewInstance(final String className) + { + return AccessController.doPrivileged(new PrivilegedAction<Object>() + { + public Object run() + { + return ClassloadingUtil.newInstanceFromClassLoader(className); + } + }); + } + + public void addProtocolManagerFactory(ProtocolManagerFactory factory) + { + protocolManagerFactories.add(factory); + } + + public void removeProtocolManagerFactory(ProtocolManagerFactory factory) + { + protocolManagerFactories.remove(factory); + } + + @Override + public ActiveMQServer createBackupServer(Configuration configuration) + { + return new ActiveMQServerImpl(configuration, null, securityManager, this); + } + + @Override + public void addScaledDownNode(SimpleString scaledDownNodeId) + { + synchronized (scaledDownNodeIDs) + { + scaledDownNodeIDs.add(scaledDownNodeId); + if (scaledDownNodeIDs.size() > 10) + { + scaledDownNodeIDs.remove(10); + } + } + } + + @Override + public boolean hasScaledDown(SimpleString scaledDownNodeId) + { + return scaledDownNodeIDs.contains(scaledDownNodeId); + } + + + int countNumberOfCopiedJournals() + { + //will use the main journal to check for how many backups have been kept + File journalDir = new File(configuration.getJournalDirectory()); + final String fileName = journalDir.getName(); + int numberOfbackupsSaved = 0; + //fine if it doesn't exist, we aren't using file based persistence so it's no issue + if (journalDir.exists()) + { + File parentFile = new File(journalDir.getParent()); + String[] backupJournals = parentFile.list(new FilenameFilter() + { + @Override + public boolean accept(File dir, String name) + { + return name.startsWith(fileName) && !name.matches(fileName); + } + }); + numberOfbackupsSaved = backupJournals != null ? backupJournals.length : 0; + } + return numberOfbackupsSaved; + } + + /** + * Move data away before starting data synchronization for fail-back. + * <p/> + * Use case is a server, upon restarting, finding a former backup running in its place. It will + * move any older data away and log a warning about it. + */ + void moveServerData() + { + String[] dataDirs = + new String[]{configuration.getBindingsDirectory(), + configuration.getJournalDirectory(), + configuration.getPagingDirectory(), + configuration.getLargeMessagesDirectory()}; + boolean allEmpty = true; + int lowestSuffixForMovedData = 1; + boolean redo = true; + + while (redo) + { + redo = false; + for (String dir : dataDirs) + { + File fDir = new File(dir); + if (fDir.exists()) + { + if (!fDir.isDirectory()) + { + throw ActiveMQMessageBundle.BUNDLE.journalDirIsFile(fDir); + } + + if (fDir.list().length > 0) + allEmpty = false; + } + + String sanitizedPath = fDir.getPath(); + while (new File(sanitizedPath + lowestSuffixForMovedData).exists()) + { + lowestSuffixForMovedData++; + redo = true; + } + } + } + if (allEmpty) + return; + + for (String dir0 : dataDirs) + { + File dir = new File(dir0); + File newPath = new File(dir.getPath() + lowestSuffixForMovedData); + if (dir.exists()) + { + if (!dir.renameTo(newPath)) + { + throw ActiveMQMessageBundle.BUNDLE.couldNotMoveJournal(dir); + } + + ActiveMQServerLogger.LOGGER.backupMovingDataAway(dir0, newPath.getPath()); + } + /* + * sometimes OS's can hold on to file handles for a while so we need to check this actually qorks and then wait + * a while and try again if it doesn't + * */ + + File dirToRecreate = new File(dir0); + int count = 0; + while (!dirToRecreate.exists() && !dirToRecreate.mkdir()) + { + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) + { + } + count++; + if (count == 5) + { + throw ActiveMQMessageBundle.BUNDLE.cannotCreateDir(dir.getPath()); + } + } + } + } +}
