ARTEMIS-2064 make address & queue deployment more robust Any failure to deploy an address or queue will short-circuit the broker initialization process preventing any other addresses or queues from being deployed as well as other critical resources like acceptors, etc.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b0d30d4d Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b0d30d4d Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b0d30d4d Branch: refs/heads/master Commit: b0d30d4da5708e2f46f9cb747e0b380d05f94526 Parents: 611cedf Author: Justin Bertram <jbert...@apache.org> Authored: Wed Aug 29 21:03:00 2018 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Aug 30 15:10:44 2018 -0400 ---------------------------------------------------------------------- .../core/config/CoreAddressConfiguration.java | 10 ++++ .../core/config/CoreQueueConfiguration.java | 24 ++++++++ .../config/impl/LegacyJMSConfiguration.java | 30 +++++----- .../deployers/impl/FileConfigurationParser.java | 3 +- .../core/server/ActiveMQServerLogger.java | 10 ++++ .../core/server/impl/ActiveMQServerImpl.java | 58 +++++++++++--------- .../byteman/AddressDeploymentFailedTest.java | 45 +++++++++++++++ .../byteman/QueueDeploymentFailedTest.java | 47 ++++++++++++++++ 8 files changed, 182 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java index 290d483..069222a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java @@ -65,4 +65,14 @@ public class CoreAddressConfiguration implements Serializable { public List<CoreQueueConfiguration> getQueueConfigurations() { return queueConfigurations; } + + @Override + public String toString() { + return "CoreAddressConfiguration[" + + "name=" + name + + ", routingTypes=" + routingTypes + + ", queueConfigurations=" + queueConfigurations + + "]"; + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java index 2ccae2d..87e938e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java @@ -201,6 +201,7 @@ public class CoreQueueConfiguration implements Serializable { result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode()); result = prime * result + ((consumersBeforeDispatch == null) ? 0 : consumersBeforeDispatch.hashCode()); result = prime * result + ((delayBeforeDispatch == null) ? 0 : delayBeforeDispatch.hashCode()); + result = prime * result + ((routingType == null) ? 0 : routingType.hashCode()); return result; } @@ -265,6 +266,29 @@ public class CoreQueueConfiguration implements Serializable { } else if (!delayBeforeDispatch.equals(other.delayBeforeDispatch)) { return false; } + if (routingType == null) { + if (other.routingType != null) + return false; + } else if (!routingType.equals(other.routingType)) { + return false; + } return true; } + + @Override + public String toString() { + return "CoreQueueConfiguration[" + + "name=" + name + + ", address=" + address + + ", routingType=" + routingType + + ", durable=" + durable + + ", filterString=" + filterString + + ", maxConsumers=" + maxConsumers + + ", purgeOnNoConsumers=" + purgeOnNoConsumers + + ", exclusive=" + exclusive + + ", lastValue=" + lastValue + + ", consumersBeforeDispatch=" + consumersBeforeDispatch + + ", delayBeforeDispatch=" + delayBeforeDispatch + + "]"; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/LegacyJMSConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/LegacyJMSConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/LegacyJMSConfiguration.java index a79402c..dc50917 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/LegacyJMSConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/LegacyJMSConfiguration.java @@ -21,7 +21,6 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; import java.net.URL; -import java.util.List; import java.util.Map; import org.apache.activemq.artemis.api.core.RoutingType; @@ -150,10 +149,9 @@ public class LegacyJMSConfiguration implements Deployable { */ public void parseTopicConfiguration(final Node node) throws Exception { String topicName = node.getAttributes().getNamedItem(NAME_ATTR).getNodeValue(); - List<CoreAddressConfiguration> coreAddressConfigurations = configuration.getAddressConfigurations(); - coreAddressConfigurations.add(new CoreAddressConfiguration() - .setName(topicName) - .addRoutingType(RoutingType.MULTICAST)); + configuration.addAddressConfiguration(new CoreAddressConfiguration() + .setName(topicName) + .addRoutingType(RoutingType.MULTICAST)); } /** @@ -173,22 +171,22 @@ public class LegacyJMSConfiguration implements Deployable { for (int i = 0; i < children.getLength(); i++) { Node child = children.item(i); - if (QUEUE_SELECTOR_NODE_NAME.equals(children.item(i).getNodeName())) { - Node selectorNode = children.item(i); + if (QUEUE_SELECTOR_NODE_NAME.equals(child.getNodeName())) { + Node selectorNode = child; Node attNode = selectorNode.getAttributes().getNamedItem("string"); selectorString = attNode.getNodeValue(); } } - List<CoreAddressConfiguration> coreAddressConfigurations = configuration.getAddressConfigurations(); - coreAddressConfigurations.add(new CoreAddressConfiguration() - .setName(queueName) - .addRoutingType(RoutingType.ANYCAST) - .addQueueConfiguration(new CoreQueueConfiguration() - .setAddress(queueName) - .setName(queueName) - .setFilterString(selectorString) - .setRoutingType(RoutingType.ANYCAST))); + configuration.addAddressConfiguration(new CoreAddressConfiguration() + .setName(queueName) + .addRoutingType(RoutingType.ANYCAST) + .addQueueConfiguration(new CoreQueueConfiguration() + .setAddress(queueName) + .setName(queueName) + .setFilterString(selectorString) + .setDurable(durable) + .setRoutingType(RoutingType.ANYCAST))); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index dc5ba16..b1b2c0e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -770,8 +770,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { Element node = (Element) elements.item(0); NodeList list = node.getElementsByTagName("address"); for (int i = 0; i < list.getLength(); i++) { - CoreAddressConfiguration addrConfig = parseAddressConfiguration(list.item(i)); - config.getAddressConfigurations().add(addrConfig); + config.addAddressConfiguration(parseAddressConfiguration(list.item(i))); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 6aa53f6..e6b7b48 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1593,6 +1593,16 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 22273, value = "Address \"{0}\" is full. Bridge {1} will disconnect", format = Message.Format.MESSAGE_FORMAT) void bridgeAddressFull(String addressName, String bridgeName); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222274, value = "Failed to deploy address {0}: {1}", + format = Message.Format.MESSAGE_FORMAT) + void problemDeployingAddress(String addressName, String message); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222275, value = "Failed to deploy queue {0}: {1}", + format = Message.Format.MESSAGE_FORMAT) + void problemDeployingQueue(String queueName, String message); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 62d3c6e..6cb0515 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2742,39 +2742,43 @@ public class ActiveMQServerImpl implements ActiveMQServer { private void deployAddressesFromConfiguration(Configuration configuration) throws Exception { for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) { - AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes()); - addOrUpdateAddressInfo(info); - ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString()); - deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations()); + try { + ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString()); + AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes()); + addOrUpdateAddressInfo(info); + deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations()); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.problemDeployingAddress(config.getName(), e.getMessage()); + } } } private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception { for (CoreQueueConfiguration config : queues) { - SimpleString queueName = SimpleString.toSimpleString(config.getName()); - ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress(), config.getRoutingType().toString()); - AddressSettings as = addressSettingsRepository.getMatch(config.getAddress()); - // determine if there is an address::queue match; update it if so - int maxConsumers = config.getMaxConsumers() == null ? as.getDefaultMaxConsumers() : config.getMaxConsumers(); - boolean isExclusive = config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive(); - boolean isLastValue = config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue(); - int consumersBeforeDispatch = config.getConsumersBeforeDispatch() == null ? as.getDefaultConsumersBeforeDispatch() : config.getConsumersBeforeDispatch(); - long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch(); - - if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) { - updateQueue(config.getName(), config.getRoutingType(), maxConsumers, config.getPurgeOnNoConsumers(), - isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser()); - } else { - // if the address::queue doesn't exist then create it - try { - createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), - queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), - config.isDurable(),false,false,false,false, maxConsumers, config.getPurgeOnNoConsumers(), - isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true); - } catch (ActiveMQQueueExistsException e) { - // the queue may exist on a *different* address - ActiveMQServerLogger.LOGGER.warn(e.getMessage()); + try { + SimpleString queueName = SimpleString.toSimpleString(config.getName()); + ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress(), config.getRoutingType().toString()); + AddressSettings as = addressSettingsRepository.getMatch(config.getAddress()); + // determine if there is an address::queue match; update it if so + int maxConsumers = config.getMaxConsumers() == null ? as.getDefaultMaxConsumers() : config.getMaxConsumers(); + boolean isExclusive = config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive(); + boolean isLastValue = config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue(); + int consumersBeforeDispatch = config.getConsumersBeforeDispatch() == null ? as.getDefaultConsumersBeforeDispatch() : config.getConsumersBeforeDispatch(); + long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch(); + + if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) { + updateQueue(config.getName(), config.getRoutingType(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser()); + } else { + // if the address::queue doesn't exist then create it + try { + createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true); + } catch (ActiveMQQueueExistsException e) { + // the queue may exist on a *different* address + ActiveMQServerLogger.LOGGER.warn(e.getMessage()); + } } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.problemDeployingQueue(config.getName(), e.getMessage()); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/AddressDeploymentFailedTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/AddressDeploymentFailedTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/AddressDeploymentFailedTest.java new file mode 100644 index 0000000..0d19158 --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/AddressDeploymentFailedTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import java.util.UUID; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class AddressDeploymentFailedTest extends ActiveMQTestBase { + + @Test + @BMRule(name = "blow up address deployment", + targetClass = "org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl", + targetMethod = "addOrUpdateAddressInfo(AddressInfo)", + targetLocation = "EXIT", + action = "throw new IllegalStateException(\"test exception\")") + public void testAddressDeploymentFailure() throws Exception { + ActiveMQServer server = createServer(false, createDefaultNettyConfig()); + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(UUID.randomUUID().toString()).addRoutingType(RoutingType.ANYCAST)); + server.start(); + assertTrue(server.getRemotingService().isStarted()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b0d30d4d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/QueueDeploymentFailedTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/QueueDeploymentFailedTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/QueueDeploymentFailedTest.java new file mode 100644 index 0000000..d01215a --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/QueueDeploymentFailedTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import java.util.UUID; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class QueueDeploymentFailedTest extends ActiveMQTestBase { + + @Test + @BMRule(name = "blow up queue deployment", + targetClass = "org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl", + targetMethod = "createQueue(SimpleString,RoutingType,SimpleString,SimpleString,SimpleString,boolean,boolean,boolean,boolean,boolean,int,boolean,boolean,boolean,int,long,boolean", + targetLocation = "EXIT", + action = "throw new IllegalStateException(\"test exception\")") + public void testQueueDeploymentFailure() throws Exception { + ActiveMQServer server = createServer(false, createDefaultNettyConfig()); + String address = UUID.randomUUID().toString(); + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(address).addRoutingType(RoutingType.ANYCAST).addQueueConfiguration(new CoreQueueConfiguration().setName(UUID.randomUUID().toString()).setRoutingType(RoutingType.ANYCAST).setAddress(address))); + server.start(); + assertTrue(server.getRemotingService().isStarted()); + } +}