Repository: activemq-artemis Updated Branches: refs/heads/2.6.x 685211434 -> 4ce8743f2
ARTEMIS-2064 make address & queue deployment more robust Any failure to deploy an address or queue will short-circuit the broker initialization process preventing any other addresses or queues from being deployed as well as other critical resources like acceptors, etc. (cherry picked from commit b0d30d4da5708e2f46f9cb747e0b380d05f94526) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4ce8743f Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4ce8743f Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4ce8743f Branch: refs/heads/2.6.x Commit: 4ce8743f200423a19edbc2c950478323344ebe69 Parents: 6852114 Author: Justin Bertram <jbert...@apache.org> Authored: Wed Aug 29 21:03:00 2018 -0500 Committer: Justin Bertram <jbert...@apache.org> Committed: Thu Aug 30 14:55:41 2018 -0500 ---------------------------------------------------------------------- .../core/config/CoreAddressConfiguration.java | 10 ++++ .../core/config/CoreQueueConfiguration.java | 22 ++++++++ .../deployers/impl/FileConfigurationParser.java | 3 +- .../core/server/ActiveMQServerLogger.java | 10 ++++ .../core/server/impl/ActiveMQServerImpl.java | 58 +++++++++++--------- .../byteman/AddressDeploymentFailedTest.java | 45 +++++++++++++++ .../byteman/QueueDeploymentFailedTest.java | 47 ++++++++++++++++ 7 files changed, 168 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ce8743f/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java index 290d483..069222a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreAddressConfiguration.java @@ -65,4 +65,14 @@ public class CoreAddressConfiguration implements Serializable { public List<CoreQueueConfiguration> getQueueConfigurations() { return queueConfigurations; } + + @Override + public String toString() { + return "CoreAddressConfiguration[" + + "name=" + name + + ", routingTypes=" + routingTypes + + ", queueConfigurations=" + queueConfigurations + + "]"; + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ce8743f/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java index f301b90..75e102f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java @@ -183,6 +183,7 @@ public class CoreQueueConfiguration implements Serializable { result = prime * result + ((exclusive == null) ? 0 : exclusive.hashCode()); result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode()); result = prime * result + (maxConsumerConfigured ? 1331 : 1337); + result = prime * result + ((routingType == null) ? 0 : routingType.hashCode()); return result; } @@ -237,6 +238,27 @@ public class CoreQueueConfiguration implements Serializable { } else if (!lastValue.equals(other.lastValue)) { return false; } + if (routingType == null) { + if (other.routingType != null) + return false; + } else if (!routingType.equals(other.routingType)) { + return false; + } return true; } + + @Override + public String toString() { + return "CoreQueueConfiguration[" + + "name=" + name + + ", address=" + address + + ", routingType=" + routingType + + ", durable=" + durable + + ", filterString=" + filterString + + ", maxConsumers=" + maxConsumers + + ", purgeOnNoConsumers=" + purgeOnNoConsumers + + ", exclusive=" + exclusive + + ", lastValue=" + lastValue + + "]"; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ce8743f/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 17f3b67..c87fe04 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -764,8 +764,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { Element node = (Element) elements.item(0); NodeList list = node.getElementsByTagName("address"); for (int i = 0; i < list.getLength(); i++) { - CoreAddressConfiguration addrConfig = parseAddressConfiguration(list.item(i)); - config.getAddressConfigurations().add(addrConfig); + config.addAddressConfiguration(parseAddressConfiguration(list.item(i))); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ce8743f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 6b79672..cc9674e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1593,6 +1593,16 @@ public interface ActiveMQServerLogger extends BasicLogger { @Message(id = 22273, value = "Address \"{0}\" is full. Bridge {1} will disconnect", format = Message.Format.MESSAGE_FORMAT) void bridgeAddressFull(String addressName, String bridgeName); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222274, value = "Failed to deploy address {0}: {1}", + format = Message.Format.MESSAGE_FORMAT) + void problemDeployingAddress(String addressName, String message); + + @LogMessage(level = Logger.Level.WARN) + @Message(id = 222275, value = "Failed to deploy queue {0}: {1}", + format = Message.Format.MESSAGE_FORMAT) + void problemDeployingQueue(String queueName, String message); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT) void initializationError(@Cause Throwable e); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ce8743f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 1e56af4..3c427b2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2530,37 +2530,45 @@ public class ActiveMQServerImpl implements ActiveMQServer { private void deployAddressesFromConfiguration(Configuration configuration) throws Exception { for (CoreAddressConfiguration config : configuration.getAddressConfigurations()) { - AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes()); - addOrUpdateAddressInfo(info); - ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString()); - deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations()); + try { + ActiveMQServerLogger.LOGGER.deployAddress(config.getName(), config.getRoutingTypes().toString()); + AddressInfo info = new AddressInfo(SimpleString.toSimpleString(config.getName()), config.getRoutingTypes()); + addOrUpdateAddressInfo(info); + deployQueuesFromListCoreQueueConfiguration(config.getQueueConfigurations()); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.problemDeployingAddress(config.getName(), e.getMessage()); + } } } private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception { for (CoreQueueConfiguration config : queues) { - SimpleString queueName = SimpleString.toSimpleString(config.getName()); - ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress(), config.getRoutingType().toString()); - AddressSettings as = addressSettingsRepository.getMatch(config.getAddress()); - // determine if there is an address::queue match; update it if so - int maxConsumerAddressSetting = as.getDefaultMaxConsumers(); - int maxConsumerQueueConfig = config.getMaxConsumers(); - int maxConsumer = (config.isMaxConsumerConfigured()) ? maxConsumerQueueConfig : maxConsumerAddressSetting; - if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) { - updateQueue(config.getName(), config.getRoutingType(), maxConsumer, config.getPurgeOnNoConsumers(), - config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive()); - } else { - // if the address::queue doesn't exist then create it - try { - createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), - queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), - config.isDurable(),false,false,false,false,maxConsumer,config.getPurgeOnNoConsumers(), - config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive(), - config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue(), true); - } catch (ActiveMQQueueExistsException e) { - // the queue may exist on a *different* address - ActiveMQServerLogger.LOGGER.warn(e.getMessage()); + try { + SimpleString queueName = SimpleString.toSimpleString(config.getName()); + ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress(), config.getRoutingType().toString()); + AddressSettings as = addressSettingsRepository.getMatch(config.getAddress()); + // determine if there is an address::queue match; update it if so + int maxConsumerAddressSetting = as.getDefaultMaxConsumers(); + int maxConsumerQueueConfig = config.getMaxConsumers(); + int maxConsumer = (config.isMaxConsumerConfigured()) ? maxConsumerQueueConfig : maxConsumerAddressSetting; + if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) { + updateQueue(config.getName(), config.getRoutingType(), maxConsumer, config.getPurgeOnNoConsumers(), + config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive()); + } else { + // if the address::queue doesn't exist then create it + try { + createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(), + queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), + config.isDurable(),false,false,false,false,maxConsumer,config.getPurgeOnNoConsumers(), + config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive(), + config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue(), true); + } catch (ActiveMQQueueExistsException e) { + // the queue may exist on a *different* address + ActiveMQServerLogger.LOGGER.warn(e.getMessage()); + } } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.problemDeployingQueue(config.getName(), e.getMessage()); } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ce8743f/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/AddressDeploymentFailedTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/AddressDeploymentFailedTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/AddressDeploymentFailedTest.java new file mode 100644 index 0000000..0d19158 --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/AddressDeploymentFailedTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import java.util.UUID; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class AddressDeploymentFailedTest extends ActiveMQTestBase { + + @Test + @BMRule(name = "blow up address deployment", + targetClass = "org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl", + targetMethod = "addOrUpdateAddressInfo(AddressInfo)", + targetLocation = "EXIT", + action = "throw new IllegalStateException(\"test exception\")") + public void testAddressDeploymentFailure() throws Exception { + ActiveMQServer server = createServer(false, createDefaultNettyConfig()); + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(UUID.randomUUID().toString()).addRoutingType(RoutingType.ANYCAST)); + server.start(); + assertTrue(server.getRemotingService().isStarted()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ce8743f/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/QueueDeploymentFailedTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/QueueDeploymentFailedTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/QueueDeploymentFailedTest.java new file mode 100644 index 0000000..d01215a --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/QueueDeploymentFailedTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.byteman; + +import java.util.UUID; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class QueueDeploymentFailedTest extends ActiveMQTestBase { + + @Test + @BMRule(name = "blow up queue deployment", + targetClass = "org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl", + targetMethod = "createQueue(SimpleString,RoutingType,SimpleString,SimpleString,SimpleString,boolean,boolean,boolean,boolean,boolean,int,boolean,boolean,boolean,int,long,boolean", + targetLocation = "EXIT", + action = "throw new IllegalStateException(\"test exception\")") + public void testQueueDeploymentFailure() throws Exception { + ActiveMQServer server = createServer(false, createDefaultNettyConfig()); + String address = UUID.randomUUID().toString(); + server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(address).addRoutingType(RoutingType.ANYCAST).addQueueConfiguration(new CoreQueueConfiguration().setName(UUID.randomUUID().toString()).setRoutingType(RoutingType.ANYCAST).setAddress(address))); + server.start(); + assertTrue(server.getRemotingService().isStarted()); + } +}