Repository: activemq-6 Updated Branches: refs/heads/master 881f39ce4 -> 2ae6f36e7
ACTIVEMQ6-52 Graceful shutdown Implements a feature whereby the broker will not shutdown while there are clients connected. A timeout can be specified so that even if there are clients connected the broker will still shutdown after a certain time. Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/754d481d Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/754d481d Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/754d481d Branch: refs/heads/master Commit: 754d481d53975e2baded78ed7fea2ee955c44be9 Parents: 881f39c Author: jbertram <[email protected]> Authored: Mon Jan 19 09:34:09 2015 -0600 Committer: jbertram <[email protected]> Committed: Wed Jan 21 13:18:55 2015 -0600 ---------------------------------------------------------------------- .../config/ActiveMQDefaultConfiguration.java | 22 +++ .../activemq/core/config/Configuration.java | 22 +++ .../core/config/impl/ConfigurationImpl.java | 26 +++ .../deployers/impl/FileConfigurationParser.java | 5 + .../core/remoting/server/RemotingService.java | 8 + .../server/impl/RemotingServiceImpl.java | 23 ++- .../core/server/impl/ActiveMQServerImpl.java | 19 ++ .../resources/schema/activemq-configuration.xsd | 16 ++ .../impl/DefaultsFileConfigurationTest.java | 4 + .../core/config/impl/FileConfigurationTest.java | 2 + .../resources/ConfigurationTest-full-config.xml | 2 + docs/user-manual/en/SUMMARY.md | 1 + docs/user-manual/en/configuration-index.md | 16 ++ docs/user-manual/en/graceful-shutdown.md | 21 ++ .../server/GracefulShutdownTest.java | 190 +++++++++++++++++++ 15 files changed, 375 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java ---------------------------------------------------------------------- diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java b/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java index 01e789d..4049fb4 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/api/config/ActiveMQDefaultConfiguration.java @@ -153,6 +153,12 @@ public final class ActiveMQDefaultConfiguration // true means that security is enabled private static boolean DEFAULT_SECURITY_ENABLED = true; + // true means that graceful shutdown is enabled + private static boolean DEFAULT_GRACEFUL_SHUTDOWN_ENABLED = false; + + // how long (in ms) to wait before forcing the server to stop even if clients are still connected (i.e circumventing graceful shutdown) + private static long DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = -1; + // how long (in ms) to wait before invalidating the security cache private static long DEFAULT_SECURITY_INVALIDATION_INTERVAL = 10000; @@ -449,6 +455,22 @@ public final class ActiveMQDefaultConfiguration } /** + * true means that graceful shutdown is enabled + */ + public static boolean isDefaultGracefulShutdownEnabled() + { + return DEFAULT_GRACEFUL_SHUTDOWN_ENABLED; + } + + /** + * true means that graceful shutdown is enabled + */ + public static long getDefaultGracefulShutdownTimeout() + { + return DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT; + } + + /** * how long (in ms) to wait before invalidating the security cache */ public static long getDefaultSecurityInvalidationInterval() http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/activemq-server/src/main/java/org/apache/activemq/core/config/Configuration.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/main/java/org/apache/activemq/core/config/Configuration.java b/activemq-server/src/main/java/org/apache/activemq/core/config/Configuration.java index 72b29dd..c814979 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/config/Configuration.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/config/Configuration.java @@ -132,6 +132,28 @@ public interface Configuration extends Serializable Configuration setSecurityEnabled(boolean enabled); /** + * Returns whether graceful shutdown is enabled for this server. <br> + * Default value is {@value org.apache.activemq.api.config.ActiveMQDefaultConfiguration#DEFAULT_SECURITY_ENABLED}. + */ + boolean isGracefulShutdownEnabled(); + + /** + * Sets whether security is enabled for this server. + */ + Configuration setGracefulShutdownEnabled(boolean enabled); + + /** + * Returns the graceful shutdown timeout for this server. <br> + * Default value is {@value org.apache.activemq.api.config.ActiveMQDefaultConfiguration#DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT}. + */ + long getGracefulShutdownTimeout(); + + /** + * Sets the graceful shutdown timeout + */ + Configuration setGracefulShutdownTimeout(long timeout); + + /** * Returns whether this server is manageable using JMX or not. <br> * Default value is {@value org.apache.activemq.api.config.ActiveMQDefaultConfiguration#DEFAULT_JMX_MANAGEMENT_ENABLED}. */ http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/activemq-server/src/main/java/org/apache/activemq/core/config/impl/ConfigurationImpl.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/main/java/org/apache/activemq/core/config/impl/ConfigurationImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/config/impl/ConfigurationImpl.java index cf6354a..3c39a8b 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/config/impl/ConfigurationImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/config/impl/ConfigurationImpl.java @@ -78,6 +78,10 @@ public class ConfigurationImpl implements Configuration private boolean securityEnabled = ActiveMQDefaultConfiguration.isDefaultSecurityEnabled(); + private boolean gracefulShutdownEnabled = ActiveMQDefaultConfiguration.isDefaultGracefulShutdownEnabled(); + + private long gracefulShutdownTimeout = ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout(); + protected boolean jmxManagementEnabled = ActiveMQDefaultConfiguration.isDefaultJmxManagementEnabled(); protected String jmxDomain = ActiveMQDefaultConfiguration.getDefaultJmxDomain(); @@ -738,6 +742,28 @@ public class ConfigurationImpl implements Configuration return this; } + public boolean isGracefulShutdownEnabled() + { + return gracefulShutdownEnabled; + } + + public ConfigurationImpl setGracefulShutdownEnabled(final boolean enabled) + { + gracefulShutdownEnabled = enabled; + return this; + } + + public long getGracefulShutdownTimeout() + { + return gracefulShutdownTimeout; + } + + public ConfigurationImpl setGracefulShutdownTimeout(final long timeout) + { + gracefulShutdownTimeout = timeout; + return this; + } + public boolean isJMXManagementEnabled() { return jmxManagementEnabled; http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java b/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java index 996d740..8ebea6e 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/deployers/impl/FileConfigurationParser.java @@ -219,6 +219,11 @@ public final class FileConfigurationParser extends XMLConfigurationUtil config.setSecurityEnabled(getBoolean(e, "security-enabled", config.isSecurityEnabled())); + config.setGracefulShutdownEnabled(getBoolean(e, "graceful-shutdown-enabled", config.isGracefulShutdownEnabled())); + + config.setGracefulShutdownTimeout(getLong(e, "graceful-shutdown-timeout", + config.getGracefulShutdownTimeout(), Validators.MINUS_ONE_OR_GE_ZERO)); + config.setJMXManagementEnabled(getBoolean(e, "jmx-management-enabled", config.isJMXManagementEnabled())); config.setJMXDomain(getString(e, "jmx-domain", config.getJMXDomain(), Validators.NOT_NULL_OR_EMPTY)); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java index c12ce3e..0eaf6e2 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/RemotingService.java @@ -23,6 +23,7 @@ import org.apache.activemq.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.core.security.ActiveMQPrincipal; import org.apache.activemq.spi.core.protocol.RemotingConnection; import org.apache.activemq.spi.core.remoting.Acceptor; +import org.apache.activemq.utils.ReusableLatch; /** * @author <a href="mailto:[email protected]">Jeff Mesnil</a> @@ -43,6 +44,8 @@ public interface RemotingService Set<RemotingConnection> getConnections(); + ReusableLatch getConnectionCountLatch(); + void addIncomingInterceptor(Interceptor interceptor); void addOutgoingInterceptor(Interceptor interceptor); @@ -67,6 +70,11 @@ public interface RemotingService void allowInvmSecurityOverride(ActiveMQPrincipal principal); /** + * Pauses the acceptors so that no more connections can be made to the server + */ + void pauseAcceptors(); + + /** * Freezes and then disconnects all connections except the given one and tells the client where else * it might connect (only applicable if server is in a cluster and uses scaleDown-on-failover=true). * http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java index 4df5b7a..9013995 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/server/impl/RemotingServiceImpl.java @@ -68,6 +68,7 @@ import org.apache.activemq.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.utils.ClassloadingUtil; import org.apache.activemq.utils.ConfigurationHelper; import org.apache.activemq.utils.ActiveMQThreadFactory; +import org.apache.activemq.utils.ReusableLatch; /** * @author <a href="mailto:[email protected]">Jeff Mesnil</a> @@ -97,6 +98,8 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle private final Map<Object, ConnectionEntry> connections = new ConcurrentHashMap<Object, ConnectionEntry>(); + private final ReusableLatch connectionCountLatch = new ReusableLatch(0); + private final ActiveMQServer server; private final ManagementService managementService; @@ -355,11 +358,10 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } } - public synchronized void freeze(final String scaleDownNodeID, final CoreRemotingConnection connectionToKeepOpen) + public synchronized void pauseAcceptors() { if (!started) return; - failureCheckAndFlushThread.close(false); for (Acceptor acceptor : acceptors.values()) { @@ -372,6 +374,13 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(); } } + } + + public synchronized void freeze(final String scaleDownNodeID, final CoreRemotingConnection connectionToKeepOpen) + { + if (!started) + return; + failureCheckAndFlushThread.close(false); HashMap<Object, ConnectionEntry> connectionEntries = new HashMap<Object, ConnectionEntry>(connections); // Now we ensure that no connections will process any more packets after this method is @@ -392,6 +401,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle { conn.disconnect(scaleDownNodeID, false); connections.remove(entry.getKey()); + connectionCountLatch.countDown(); } } } @@ -444,6 +454,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle acceptors.clear(); connections.clear(); + connectionCountLatch.setCount(0); if (managementService != null) { @@ -497,6 +508,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle public RemotingConnection removeConnection(final Object remotingConnectionID) { ConnectionEntry entry = connections.remove(remotingConnectionID); + connectionCountLatch.countDown(); if (entry != null) { @@ -522,6 +534,11 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle return conns; } + public synchronized ReusableLatch getConnectionCountLatch() + { + return connectionCountLatch; + } + // ConnectionLifeCycleListener implementation ----------------------------------- private ProtocolManager getProtocolManager(String protocol) @@ -551,6 +568,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle } connections.put(connection.getID(), entry); + connectionCountLatch.countUp(); } public void connectionDestroyed(final Object connectionID) @@ -587,6 +605,7 @@ public class RemotingServiceImpl implements RemotingService, ConnectionLifeCycle if (empty) { connections.remove(connectionID); + connectionCountLatch.countDown(); conn.connection.destroy(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java index b1f237d..cb91351 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java @@ -625,6 +625,25 @@ public class ActiveMQServerImpl implements ActiveMQServer } stopComponent(clusterManager); + if (remotingService != null) + { + remotingService.pauseAcceptors(); + } + + // allows for graceful shutdown + if (remotingService != null && configuration.isGracefulShutdownEnabled()) + { + long timeout = configuration.getGracefulShutdownTimeout(); + if (timeout == -1) + { + remotingService.getConnectionCountLatch().await(); + } + else + { + remotingService.getConnectionCountLatch().await(timeout); + } + } + freezeConnections(); } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/activemq-server/src/main/resources/schema/activemq-configuration.xsd ---------------------------------------------------------------------- diff --git a/activemq-server/src/main/resources/schema/activemq-configuration.xsd b/activemq-server/src/main/resources/schema/activemq-configuration.xsd index 7ef6023..723cf57 100644 --- a/activemq-server/src/main/resources/schema/activemq-configuration.xsd +++ b/activemq-server/src/main/resources/schema/activemq-configuration.xsd @@ -69,6 +69,22 @@ </xsd:annotation> </xsd:element> + <xsd:element name="graceful-shutdown-enabled" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + true means that graceful shutdown is enabled + </xsd:documentation> + </xsd:annotation> + </xsd:element> + + <xsd:element name="graceful-shutdown-timeout" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + how long (in ms) to wait for clients to disconnect before shutting down the server + </xsd:documentation> + </xsd:annotation> + </xsd:element> + <xsd:element name="security-enabled" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/activemq-server/src/test/java/org/apache/activemq/core/config/impl/DefaultsFileConfigurationTest.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/test/java/org/apache/activemq/core/config/impl/DefaultsFileConfigurationTest.java b/activemq-server/src/test/java/org/apache/activemq/core/config/impl/DefaultsFileConfigurationTest.java index 2eb50c3..f064396 100644 --- a/activemq-server/src/test/java/org/apache/activemq/core/config/impl/DefaultsFileConfigurationTest.java +++ b/activemq-server/src/test/java/org/apache/activemq/core/config/impl/DefaultsFileConfigurationTest.java @@ -144,6 +144,10 @@ public class DefaultsFileConfigurationTest extends ConfigurationImplTest conf.getMessageExpiryThreadPriority()); Assert.assertTrue(conf.getHAPolicyConfiguration() instanceof LiveOnlyPolicyConfiguration); + + Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultGracefulShutdownEnabled(), conf.isGracefulShutdownEnabled()); + + Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout(), conf.getGracefulShutdownTimeout()); } // Protected --------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java ---------------------------------------------------------------------- diff --git a/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java b/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java index 7c1a166..182cd7f 100644 --- a/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java +++ b/activemq-server/src/test/java/org/apache/activemq/core/config/impl/FileConfigurationTest.java @@ -90,6 +90,8 @@ public class FileConfigurationTest extends ConfigurationImplTest Assert.assertEquals(100, conf.getJournalMinFiles()); Assert.assertEquals(123, conf.getJournalCompactMinFiles()); Assert.assertEquals(33, conf.getJournalCompactPercentage()); + Assert.assertEquals(true, conf.isGracefulShutdownEnabled()); + Assert.assertEquals(12345, conf.getGracefulShutdownTimeout()); Assert.assertEquals("largemessagesdir", conf.getLargeMessagesDirectory()); Assert.assertEquals(95, conf.getMemoryWarningThreshold()); http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/activemq-server/src/test/resources/ConfigurationTest-full-config.xml ---------------------------------------------------------------------- diff --git a/activemq-server/src/test/resources/ConfigurationTest-full-config.xml b/activemq-server/src/test/resources/ConfigurationTest-full-config.xml index 1045e21..79cded1 100644 --- a/activemq-server/src/test/resources/ConfigurationTest-full-config.xml +++ b/activemq-server/src/test/resources/ConfigurationTest-full-config.xml @@ -25,6 +25,8 @@ <scheduled-thread-pool-max-size>12345</scheduled-thread-pool-max-size> <thread-pool-max-size>54321</thread-pool-max-size> <security-enabled>false</security-enabled> + <graceful-shutdown-enabled>true</graceful-shutdown-enabled> + <graceful-shutdown-timeout>12345</graceful-shutdown-timeout> <security-invalidation-interval>5423</security-invalidation-interval> <journal-lock-acquisition-timeout>123</journal-lock-acquisition-timeout> <wild-card-routing-enabled>true</wild-card-routing-enabled> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/docs/user-manual/en/SUMMARY.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md index df09786..400495c 100644 --- a/docs/user-manual/en/SUMMARY.md +++ b/docs/user-manual/en/SUMMARY.md @@ -37,6 +37,7 @@ * [Duplicate Message Detection](duplicate-detection.md) * [Clusters](clusters.md) * [High Availability and Failover](ha.md) +* [Graceful Server Shutdown](graceful-shutdown.md) * [Libaio Native Libraries](libaio.md) * [Thread management](thread-pooling.md) * [Logging](logging.md) http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/docs/user-manual/en/configuration-index.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md index 3d9d478..8ece98e 100644 --- a/docs/user-manual/en/configuration-index.md +++ b/docs/user-manual/en/configuration-index.md @@ -1087,6 +1087,22 @@ element is used by the server side JMS service to load JMS Queues, Topics </tr> <tr> <td> + <a href="graceful-shutdown.md" title="Graceful Server Shutdown">graceful-shutdown-enabled</a> + </td> + <td>xsd:boolean</td> + <td>true means that graceful shutdown is enabled</td> + <td>true</td> + </tr> + <tr> + <td> + <a href="graceful-shutdown.md" title="Graceful Server Shutdown">graceful-shutdown-timeout</a> + </td> + <td>xsd:long</td> + <td>how long (in ms) to wait for all clients to disconnect before forcefully disconnecting the clients and proceeding with the shutdown process (-1 means no timeout)</td> + <td>-1</td> + </tr> + <tr> + <td> <a href="message-grouping.md" title="Chapter 28. Message Grouping">grouping-handler</a> </td> <td>Complex element</td> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/docs/user-manual/en/graceful-shutdown.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/graceful-shutdown.md b/docs/user-manual/en/graceful-shutdown.md new file mode 100644 index 0000000..a69aea4 --- /dev/null +++ b/docs/user-manual/en/graceful-shutdown.md @@ -0,0 +1,21 @@ +# Graceful Server Shutdown + +In certain circumstances an administrator might not want to disconnect +all clients immediately when stopping the broker. In this situation the +broker can be configured to shutdown *gracefully* using the +`graceful-shutdown-enabled` boolean configuration parameter. + +When the `graceful-shutdown-enabled` configuration parameter is `true` +and the broker is shutdown it will first prevent any additional clients +from connecting and then it will wait for any existing connections to +be terminated by the client before completing the shutdown process. The +default value is `false`. + +Of course, it's possible a client could keep a connection to the broker +indefinitely effectively preventing the broker from shutting down +gracefully. To deal with this of situation the +`graceful-shutdown-timeout` configuration parameter is available. This +tells the broker (in milliseconds) how long to wait for all clients to +disconnect before forcefully disconnecting the clients and proceeding +with the shutdown process. The default value is `-1` which means the +broker will wait indefinitely for clients to disconnect. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/754d481d/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/GracefulShutdownTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/GracefulShutdownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/GracefulShutdownTest.java new file mode 100644 index 0000000..736fac8 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/GracefulShutdownTest.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tests.integration.server; + +import org.apache.activemq.api.core.ActiveMQExceptionType; +import org.apache.activemq.api.core.ActiveMQSessionCreationException; +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.core.client.ActiveMQClient; +import org.apache.activemq.api.core.client.ClientProducer; +import org.apache.activemq.api.core.client.ClientSession; +import org.apache.activemq.api.core.client.ClientSessionFactory; +import org.apache.activemq.api.core.client.ServerLocator; +import org.apache.activemq.core.config.Configuration; +import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.core.server.ActiveMQServers; +import org.apache.activemq.tests.util.ServiceTestBase; +import org.junit.Test; + +/** + * A GracefulShutdownTest + * + * @author Justin Bertram + */ +public class GracefulShutdownTest extends ServiceTestBase +{ + @Test + public void testGracefulShutdown() throws Exception + { + Configuration conf = createDefaultConfig(); + + conf.setGracefulShutdownEnabled(true); + + final ActiveMQServer server = ActiveMQServers.newActiveMQServer(conf, false); + + server.start(); + + ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY)); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(true, true); + + Thread t = new Thread(new Runnable() + { + public void run() + { + try + { + server.stop(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + }); + + t.setName("shutdown thread"); + t.start(); + + // wait for the thread to actually call stop() on the server + while (server.isStarted()) + { + Thread.sleep(100); + } + + // confirm we can still do work on the original connection even though the server is stopping + session.createQueue("testAddress", "testQueue"); + ClientProducer producer = session.createProducer("testAddress"); + producer.send(session.createMessage(true)); + session.start(); + assertNotNull(session.createConsumer("testQueue").receive(500)); + + try + { + sf.createSession(); + fail("Creating a session here should fail because the acceptors should be paused"); + } + catch (Exception e) + { + assertTrue(e instanceof ActiveMQSessionCreationException); + ActiveMQSessionCreationException activeMQSessionCreationException = (ActiveMQSessionCreationException) e; + assertEquals(activeMQSessionCreationException.getType(), ActiveMQExceptionType.SESSION_CREATION_REJECTED); + } + + // close the connection to allow broker shutdown to complete + locator.close(); + + long start = System.currentTimeMillis(); + + // wait for the shutdown thread to complete, interrupt it if it takes too long + while (t.isAlive()) + { + if (System.currentTimeMillis() - start > 3000) + { + t.interrupt(); + break; + } + Thread.sleep(100); + } + + // make sure the shutdown thread is dead + assertFalse(t.isAlive()); + } + + @Test + public void testGracefulShutdownWithTimeout() throws Exception + { + long timeout = 10000; + + Configuration conf = createDefaultConfig(); + + conf.setGracefulShutdownEnabled(true); + conf.setGracefulShutdownTimeout(timeout); + + final ActiveMQServer server = ActiveMQServers.newActiveMQServer(conf, false); + + server.start(); + + ServerLocator locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY)); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(); + + Thread t = new Thread(new Runnable() + { + public void run() + { + try + { + server.stop(); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + }); + + t.setName("shutdown thread"); + long start = System.currentTimeMillis(); + t.start(); + + // wait for the thread to actually call stop() on the server + while (server.isStarted()) + { + Thread.sleep(100); + } + + try + { + sf.createSession(); + fail("Creating a session here should fail because the acceptors should be paused"); + } + catch (Exception e) + { + assertTrue(e instanceof ActiveMQSessionCreationException); + ActiveMQSessionCreationException activeMQSessionCreationException = (ActiveMQSessionCreationException) e; + assertEquals(activeMQSessionCreationException.getType(), ActiveMQExceptionType.SESSION_CREATION_REJECTED); + } + + Thread.sleep(timeout / 2); + + assertTrue("thread should still be alive here waiting for the timeout to elapse", t.isAlive()); + + while (t.isAlive()) + { + Thread.sleep(100); + } + + assertTrue("thread terminated too soon, the graceful shutdown timeout wasn't enforced properly", System.currentTimeMillis() - start >= timeout); + + locator.close(); + } +}
