This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch 2.27.x in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 64918caa100cfb00b23e55ada5b3eb30a3f6c677 Author: Å mucr Jan <[email protected]> AuthorDate: Tue Nov 1 14:00:18 2022 +0100 ARTEMIS-4078 Fix divert reloading Reloading has been fixed for divert: * filter * address * exclusive Source address and exclusivity changes require divert redeployment. (cherry picked from commit 43824fc494e6ed67a16a5512865362f74bacc953) --- .../core/server/impl/ActiveMQServerImpl.java | 31 ++- .../tests/integration/jms/RedeployTest.java | 298 +++++++++++++++++---- .../resources/reload-divert-address-source1.xml | 64 +++++ .../resources/reload-divert-address-source2.xml | 64 +++++ .../resources/reload-divert-address-target1.xml | 64 +++++ .../resources/reload-divert-address-target2.xml | 64 +++++ .../src/test/resources/reload-divert-exclusive.xml | 63 +++++ .../test/resources/reload-divert-filter-none.xml | 63 +++++ .../test/resources/reload-divert-filter-x-eq-x.xml | 64 +++++ .../test/resources/reload-divert-filter-x-eq-y.xml | 64 +++++ .../test/resources/reload-divert-non-exclusive.xml | 63 +++++ 11 files changed, 845 insertions(+), 57 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index a0e499ca13..af278ebfd9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2867,8 +2867,12 @@ public class ActiveMQServerImpl implements ActiveMQServer { final Divert divert = divertBinding.getDivert(); Filter filter = FilterImpl.createFilter(config.getFilterString()); - if (filter != null && !filter.equals(divert.getFilter())) { - divert.setFilter(filter); + if (filter == null) { + divert.setFilter(null); + } else { + if (!filter.equals(divert.getFilter())) { + divert.setFilter(filter); + } } if (config.getTransformerConfiguration() != null) { @@ -2880,8 +2884,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (config.getForwardingAddress() != null) { SimpleString forwardAddress = SimpleString.toSimpleString(config.getForwardingAddress()); - - if (!forwardAddress.equals(config)) { + if (!forwardAddress.equals(divert.getForwardAddress())) { divert.setForwardAddress(forwardAddress); } } @@ -4445,16 +4448,34 @@ public class ActiveMQServerImpl implements ActiveMQServer { recoverStoredAddressSettings(); ActiveMQServerLogger.LOGGER.reloadingConfiguration("diverts"); + // Filter out all active diverts final Set<SimpleString> divertsToRemove = postOffice.getAllBindings() .filter(binding -> binding instanceof DivertBinding) .map(Binding::getUniqueName) .collect(Collectors.toSet()); + // Go through the currently configured diverts for (DivertConfiguration divertConfig : configuration.getDivertConfigurations()) { + // Retain diverts still configured to exist divertsToRemove.remove(SimpleString.toSimpleString(divertConfig.getName())); - if (postOffice.getBinding(new SimpleString(divertConfig.getName())) == null) { + // Deploy newly added diverts, reconfigure existing + final SimpleString divertName = new SimpleString(divertConfig.getName()); + final DivertBinding divertBinding = (DivertBinding) postOffice.getBinding(divertName); + if (divertBinding == null) { deployDivert(divertConfig); + } else { + if ((divertBinding.isExclusive() != divertConfig.isExclusive()) || + !divertBinding.getAddress().toString().equals(divertConfig.getAddress())) { + // Diverts whose exclusivity or address has changed have to be redeployed. + // See the Divert interface and look for setters. Absent setter is a hint that maybe that property is immutable. + destroyDivert(divertName); + deployDivert(divertConfig); + } else { + // Diverts with their exclusivity and address unchanged can be updated directly. + updateDivert(divertConfig); + } } } + // Remove all remaining diverts for (final SimpleString divertName : divertsToRemove) { try { destroyDivert(divertName); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java index c893d7514d..035772364a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java @@ -17,12 +17,14 @@ package org.apache.activemq.artemis.tests.integration.jms; +import java.io.InputStream; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -51,6 +53,7 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl; import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ; import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.server.reload.ReloadManager; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; @@ -566,31 +569,28 @@ public class RedeployTest extends ActiveMQTestBase { } } - private void deployBrokerConfig(EmbeddedActiveMQ server, URL configFile) throws Exception { - + private void deployBrokerConfig(EmbeddedActiveMQ server, String configFileName) throws Exception { Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); - Files.copy(configFile.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING); - brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000); - final ReusableLatch latch = new ReusableLatch(1); - Runnable tick = latch::countDown; - server.getActiveMQServer().getReloadManager().setTick(tick); - - latch.await(10, TimeUnit.SECONDS); + final ReloadManager reloadManager = server.getActiveMQServer().getReloadManager(); + final boolean reloadManagerOriginallyStarted = reloadManager.isStarted(); + try { + reloadManager.stop(); + final URL configFile = RedeployTest.class.getClassLoader().getResource(configFileName); + assertNotNull(configFile); + try (InputStream configStream = configFile.openStream()) { + Files.copy(configStream, brokerXML, StandardCopyOption.REPLACE_EXISTING); + } + server.getActiveMQServer().reloadConfigurationFile(); + } finally { + if (reloadManagerOriginallyStarted) { + reloadManager.start(); + } + } } - private void doTestRemoveFilter(URL testConfiguration) throws Exception { - - Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); - - URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-filter.xml"); - - Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING); + private void doTestQueueRemoveFilter(String testConfigurationFileName) throws Exception { - EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ(); - embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString()); - embeddedActiveMQ.start(); - - deployBrokerConfig(embeddedActiveMQ, baseConfig); + final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-queue-filter.xml"); try { @@ -628,7 +628,7 @@ public class RedeployTest extends ActiveMQTestBase { consumer.close(); } - deployBrokerConfig(embeddedActiveMQ, testConfiguration); + deployBrokerConfig(embeddedActiveMQ, testConfigurationFileName); try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Connection connection = factory.createConnection(); @@ -666,26 +666,20 @@ public class RedeployTest extends ActiveMQTestBase { } @Test - public void testRedeployRemoveFilter() throws Exception { - doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-updated-empty.xml")); - doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-removed.xml")); + public void testRedeployRemoveQueueFilter() throws Exception { + doTestQueueRemoveFilter("reload-queue-filter-updated-empty.xml"); + doTestQueueRemoveFilter("reload-queue-filter-removed.xml"); } /** * This one is here just to make sure it's possible to change queue parameters one by one without setting the others * to <code>null</code>. - * @throws Exception + * @throws Exception An exception. */ @Test public void testQueuePartialReconfiguration() throws Exception { - Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); - URL url = RedeployTest.class.getClassLoader().getResource("reload-empty.xml"); - Files.copy(url.openStream(), brokerXML); - - EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ(); - embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString()); - embeddedActiveMQ.start(); + final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-empty.xml"); try { @@ -708,13 +702,7 @@ public class RedeployTest extends ActiveMQTestBase { @Test public void testRedeployQueueDefaults() throws Exception { - Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); - URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-defaults-before.xml"); - URL newConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-defaults-after.xml"); - Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING); - EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ(); - embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString()); - embeddedActiveMQ.start(); + final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-queue-defaults-before.xml"); try { LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice() @@ -736,7 +724,7 @@ public class RedeployTest extends ActiveMQTestBase { assertEquals(new SimpleString("jdoe"), queue.getUser()); assertNotEquals(ActiveMQDefaultConfiguration.getDefaultRingSize(), queue.getRingSize()); - deployBrokerConfig(embeddedActiveMQ, newConfig); + deployBrokerConfig(embeddedActiveMQ, "reload-queue-defaults-after.xml"); assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queue.getMaxConsumers()); assertEquals(RoutingType.MULTICAST, queue.getRoutingType()); @@ -761,21 +749,15 @@ public class RedeployTest extends ActiveMQTestBase { @Test public void testUndeployDivert() throws Exception { - Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); - URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-divert-undeploy-before.xml"); - URL newConfig = RedeployTest.class.getClassLoader().getResource("reload-divert-undeploy-after.xml"); - Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING); - EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ(); - embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString()); - embeddedActiveMQ.start(); + final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-divert-undeploy-before.xml"); try { DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice() .getBinding(new SimpleString("divert")); assertNotNull(divertBinding); - Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE); - Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE); + final Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE); + final Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE); try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); Connection connection = factory.createConnection(); @@ -785,13 +767,13 @@ public class RedeployTest extends ActiveMQTestBase { MessageConsumer targetConsumer = session.createConsumer(targetQueue)) { connection.start(); - Message message = session.createTextMessage("Hello world"); + final Message message = session.createTextMessage("Hello world"); sourceProducer.send(message); assertNotNull(sourceConsumer.receive(2000)); assertNotNull(targetConsumer.receive(2000)); } - deployBrokerConfig(embeddedActiveMQ, newConfig); + deployBrokerConfig(embeddedActiveMQ, "reload-divert-undeploy-after.xml"); Wait.waitFor(() -> embeddedActiveMQ.getActiveMQServer().getPostOffice() .getBinding(new SimpleString("divert")) == null); @@ -817,6 +799,218 @@ public class RedeployTest extends ActiveMQTestBase { } } + private void sendDivertedTestMessage(Queue queue, Queue forwardingQueue, boolean shouldReceiveFromQueue, boolean shouldReceiveFromForwardingQueue, Map<String, String> properties) throws JMSException { + try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + Connection connection = factory.createConnection(); + Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + MessageProducer queueProducer = session.createProducer(queue); + MessageConsumer queueConsumer = session.createConsumer(queue); + MessageConsumer forwardingQueueConsumer = session.createConsumer(forwardingQueue)) { + + connection.start(); + final Message message = session.createTextMessage("Hello world"); + for (Map.Entry<String, String> entry : properties.entrySet()) { + message.setStringProperty(entry.getKey(), entry.getValue()); + } + queueProducer.send(message); + + final Message queueMessage = queueConsumer.receive(2000); + final Message forwardingQueueMessage = forwardingQueueConsumer.receive(2000); + if (shouldReceiveFromQueue) { + assertNotNull("A message should have been received from the '" + queue.getQueueName() + "' queue.", queueMessage); + } else { + assertNull("No message should have been received from the '" + queue.getQueueName() + "' queue.", queueMessage); + } + if (shouldReceiveFromForwardingQueue) { + assertNotNull("A message should have been received from the '" + forwardingQueue.getQueueName() + "' forwarding queue.", forwardingQueueMessage); + } else { + assertNull("No message should have been received from the '" + forwardingQueue.getQueueName() + "' forwarding queue.", forwardingQueueMessage); + } + } + } + + private EmbeddedActiveMQ createEmbeddedActiveMQServer(String initialConfigFileName) throws Exception { + final Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); + final URL baseConfig = RedeployTest.class.getClassLoader().getResource(initialConfigFileName); + assertNotNull(baseConfig); + try (InputStream configStream = baseConfig.openStream()) { + Files.copy(configStream, brokerXML, StandardCopyOption.REPLACE_EXISTING); + } + final EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ(); + embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString()); + embeddedActiveMQ.start(); + waitForServerToStart(embeddedActiveMQ.getActiveMQServer()); + return embeddedActiveMQ; + } + + @Test + public void testAddDivertFilter() throws Exception { + + final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-divert-filter-none.xml"); + + final SimpleString divertName = new SimpleString("source-to-target"); + final Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE); + final Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE); + final Map<String, String> emptyTestMessageProperties = Map.of(); + final Map<String, String> testMessagePropertiesXX = Map.of("x", "x"); + + try { + DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName); + assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding); + assertNull("The divert '" + divertName + "' should have no filter applied at first.", divertBinding.getFilter()); + + assertNull(embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(new SimpleString("foo"))); + + // Message with no properties should be diverted. + sendDivertedTestMessage(sourceQueue, targetQueue, false, true, emptyTestMessageProperties); + // Message with properties should be diverted too. + sendDivertedTestMessage(sourceQueue, targetQueue, false, true, testMessagePropertiesXX); + + // Add filter + deployBrokerConfig(embeddedActiveMQ, "reload-divert-filter-x-eq-x.xml"); + + divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName); + assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding); + assertNotNull("The divert '" + divertName + "' should have a filter applied after the new configuration is loaded.", divertBinding.getFilter()); + + // Message with no properties SHOULD NOT be diverted. + sendDivertedTestMessage(sourceQueue, targetQueue, true, false, emptyTestMessageProperties); + // Message with property x == "x" SHOULD be diverted. + sendDivertedTestMessage(sourceQueue, targetQueue, false, true, testMessagePropertiesXX); + } finally { + embeddedActiveMQ.stop(); + } + } + + @Test + public void testRemoveDivertFilter() throws Exception { + + final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-divert-filter-x-eq-x.xml"); + final SimpleString divertName = new SimpleString("source-to-target"); + final Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE); + final Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE); + final Map<String, String> emptyTestMessageProperties = Map.of(); + final Map<String, String> testMessagePropertiesXX = Map.of("x", "x"); + + try { + DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName); + assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding); + assertNotNull("The divert '" + divertName + "' should have a filter applied at first.", divertBinding.getFilter()); + + // Message with no properties SHOULD NOT be diverted. + sendDivertedTestMessage(sourceQueue, targetQueue, true, false, emptyTestMessageProperties); + // Message with property x == "x" SHOULD be diverted. + sendDivertedTestMessage(sourceQueue, targetQueue, false, true, testMessagePropertiesXX); + + // Remove filter + deployBrokerConfig(embeddedActiveMQ, "reload-divert-filter-none.xml"); + + divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName); + assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding); + assertNull("The divert '" + divertName + "' should not have a filter applied after the new configuration is loaded.", divertBinding.getFilter()); + + // Message with no properties should be diverted. + sendDivertedTestMessage(sourceQueue, targetQueue, false, true, emptyTestMessageProperties); + // Message with properties should be diverted too. + sendDivertedTestMessage(sourceQueue, targetQueue, false, true, testMessagePropertiesXX); + } finally { + embeddedActiveMQ.stop(); + } + } + + @Test + public void testChangeDivertFilter() throws Exception { + + final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-divert-filter-x-eq-x.xml"); + final SimpleString divertName = new SimpleString("source-to-target"); + final Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE); + final Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE); + final Map<String, String> testMessagePropertiesXX = Map.of("x", "x"); + final Map<String, String> testMessagePropertiesXY = Map.of("x", "y"); + + try { + DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName); + assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding); + assertNotNull("The divert '" + divertName + "' should have a filter applied after the first configuration file is loaded.", divertBinding.getFilter()); + + // Message with property x == "x" SHOULD be diverted. + sendDivertedTestMessage(sourceQueue, targetQueue, false, true, testMessagePropertiesXX); + // Message with property x == "y" SHOULD NOT be diverted. + sendDivertedTestMessage(sourceQueue, targetQueue, true, false, testMessagePropertiesXY); + + // Update filter + deployBrokerConfig(embeddedActiveMQ, "reload-divert-filter-x-eq-y.xml"); + + divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName); + assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding); + assertNotNull("The divert '" + divertName + "' should have a filter applied after the second configuration file is loaded.", divertBinding.getFilter()); + + // Message with property x == "x" SHOULD NOT be diverted. + sendDivertedTestMessage(sourceQueue, targetQueue, true, false, testMessagePropertiesXX); + // Message with property x == "y" SHOULD be diverted. + sendDivertedTestMessage(sourceQueue, targetQueue, false, true, testMessagePropertiesXY); + } finally { + embeddedActiveMQ.stop(); + } + } + + @Test + public void testChangeDivertExclusivity() throws Exception { + + final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-divert-exclusive.xml"); + final SimpleString divertName = new SimpleString("source-to-target"); + final Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE); + final Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE); + + try { + DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName); + assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding); + + // Message should be routed to the forwarding queue only + sendDivertedTestMessage(sourceQueue, targetQueue, false, true, Map.of()); + + // Route to both queues + deployBrokerConfig(embeddedActiveMQ, "reload-divert-non-exclusive.xml"); + sendDivertedTestMessage(sourceQueue, targetQueue, true, true, Map.of()); + + // Route to the forwarding queue only + deployBrokerConfig(embeddedActiveMQ, "reload-divert-exclusive.xml"); + sendDivertedTestMessage(sourceQueue, targetQueue, false, true, Map.of()); + } finally { + embeddedActiveMQ.stop(); + } + } + + @Test + public void testChangeDivertAddress() throws Exception { + + final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-divert-address-source1.xml"); + final SimpleString divertName = new SimpleString("source-to-target"); + final Queue sourceQueue1 = (Queue) ActiveMQDestination.createDestination("queue://source1", ActiveMQDestination.TYPE.QUEUE); + final Queue sourceQueue2 = (Queue) ActiveMQDestination.createDestination("queue://source2", ActiveMQDestination.TYPE.QUEUE); + final Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE); + + try { + DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName); + assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding); + assertEquals("Divert '" + divertName + "' address should be '" + sourceQueue1.getQueueName() + "'.", sourceQueue1.getQueueName(), divertBinding.getAddress().toString()); + sendDivertedTestMessage(sourceQueue1, targetQueue, false, true, Map.of()); + sendDivertedTestMessage(sourceQueue2, targetQueue, true, false, Map.of()); + + deployBrokerConfig(embeddedActiveMQ, "reload-divert-address-source2.xml"); + + divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName); + assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding); + assertEquals("Divert '" + divertName + "' address should have been updated to '" + sourceQueue2.getQueueName() + "'.", sourceQueue2.getQueueName(), divertBinding.getAddress().toString()); + sendDivertedTestMessage(sourceQueue1, targetQueue, true, false, Map.of()); + sendDivertedTestMessage(sourceQueue2, targetQueue, false, true, Map.of()); + } finally { + embeddedActiveMQ.stop(); + } + } + + // TODO: Test divert transformers: add, change, remove + @Test public void testRedeployWithFailover() throws Exception { Set<Role> original = new HashSet<>(); diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml b/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml new file mode 100644 index 0000000000..a055523c80 --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml @@ -0,0 +1,64 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>0.0.0.0</name> + + <configuration-file-refresh-period>100</configuration-file-refresh-period> + + <persistence-enabled>false</persistence-enabled> + + <security-enabled>false</security-enabled> + + <journal-type>NIO</journal-type> + + <paging-directory>./target/data/paging</paging-directory> + + <bindings-directory>./target/data/bindings</bindings-directory> + + <journal-directory>./target/data/journal</journal-directory> + + <large-messages-directory>./target/data/large-messages</large-messages-directory> + + + <acceptors> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> + </acceptors> + + <addresses> + <address name="source1"/> + <address name="source2"/> + <address name="target"/> + </addresses> + + <diverts> + <divert name="source-to-target"> + <address>source1</address> + <forwarding-address>target</forwarding-address> + <exclusive>true</exclusive> + </divert> + </diverts> + </core> +</configuration> diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml b/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml new file mode 100644 index 0000000000..fbb08005a0 --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml @@ -0,0 +1,64 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>0.0.0.0</name> + + <configuration-file-refresh-period>100</configuration-file-refresh-period> + + <persistence-enabled>false</persistence-enabled> + + <security-enabled>false</security-enabled> + + <journal-type>NIO</journal-type> + + <paging-directory>./target/data/paging</paging-directory> + + <bindings-directory>./target/data/bindings</bindings-directory> + + <journal-directory>./target/data/journal</journal-directory> + + <large-messages-directory>./target/data/large-messages</large-messages-directory> + + + <acceptors> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> + </acceptors> + + <addresses> + <address name="source1"/> + <address name="source2"/> + <address name="target"/> + </addresses> + + <diverts> + <divert name="source-to-target"> + <address>source2</address> + <forwarding-address>target</forwarding-address> + <exclusive>true</exclusive> + </divert> + </diverts> + </core> +</configuration> diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml b/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml new file mode 100644 index 0000000000..220fee04eb --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml @@ -0,0 +1,64 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>0.0.0.0</name> + + <configuration-file-refresh-period>100</configuration-file-refresh-period> + + <persistence-enabled>false</persistence-enabled> + + <security-enabled>false</security-enabled> + + <journal-type>NIO</journal-type> + + <paging-directory>./target/data/paging</paging-directory> + + <bindings-directory>./target/data/bindings</bindings-directory> + + <journal-directory>./target/data/journal</journal-directory> + + <large-messages-directory>./target/data/large-messages</large-messages-directory> + + + <acceptors> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> + </acceptors> + + <addresses> + <address name="source"/> + <address name="target1"/> + <address name="target2"/> + </addresses> + + <diverts> + <divert name="source-to-target"> + <address>source</address> + <forwarding-address>target1</forwarding-address> + <exclusive>true</exclusive> + </divert> + </diverts> + </core> +</configuration> diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml b/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml new file mode 100644 index 0000000000..1b3bf2c858 --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml @@ -0,0 +1,64 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>0.0.0.0</name> + + <configuration-file-refresh-period>100</configuration-file-refresh-period> + + <persistence-enabled>false</persistence-enabled> + + <security-enabled>false</security-enabled> + + <journal-type>NIO</journal-type> + + <paging-directory>./target/data/paging</paging-directory> + + <bindings-directory>./target/data/bindings</bindings-directory> + + <journal-directory>./target/data/journal</journal-directory> + + <large-messages-directory>./target/data/large-messages</large-messages-directory> + + + <acceptors> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> + </acceptors> + + <addresses> + <address name="source"/> + <address name="target1"/> + <address name="target2"/> + </addresses> + + <diverts> + <divert name="source-to-target"> + <address>source</address> + <forwarding-address>target2</forwarding-address> + <exclusive>true</exclusive> + </divert> + </diverts> + </core> +</configuration> diff --git a/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml b/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml new file mode 100644 index 0000000000..c8a0054be2 --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml @@ -0,0 +1,63 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>0.0.0.0</name> + + <configuration-file-refresh-period>100</configuration-file-refresh-period> + + <persistence-enabled>false</persistence-enabled> + + <security-enabled>false</security-enabled> + + <journal-type>NIO</journal-type> + + <paging-directory>./target/data/paging</paging-directory> + + <bindings-directory>./target/data/bindings</bindings-directory> + + <journal-directory>./target/data/journal</journal-directory> + + <large-messages-directory>./target/data/large-messages</large-messages-directory> + + + <acceptors> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> + </acceptors> + + <addresses> + <address name="source"/> + <address name="target"/> + </addresses> + + <diverts> + <divert name="source-to-target"> + <address>source</address> + <forwarding-address>target</forwarding-address> + <exclusive>true</exclusive> + </divert> + </diverts> + </core> +</configuration> diff --git a/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml b/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml new file mode 100644 index 0000000000..c8a0054be2 --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml @@ -0,0 +1,63 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>0.0.0.0</name> + + <configuration-file-refresh-period>100</configuration-file-refresh-period> + + <persistence-enabled>false</persistence-enabled> + + <security-enabled>false</security-enabled> + + <journal-type>NIO</journal-type> + + <paging-directory>./target/data/paging</paging-directory> + + <bindings-directory>./target/data/bindings</bindings-directory> + + <journal-directory>./target/data/journal</journal-directory> + + <large-messages-directory>./target/data/large-messages</large-messages-directory> + + + <acceptors> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> + </acceptors> + + <addresses> + <address name="source"/> + <address name="target"/> + </addresses> + + <diverts> + <divert name="source-to-target"> + <address>source</address> + <forwarding-address>target</forwarding-address> + <exclusive>true</exclusive> + </divert> + </diverts> + </core> +</configuration> diff --git a/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml new file mode 100644 index 0000000000..e748824490 --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml @@ -0,0 +1,64 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>0.0.0.0</name> + + <configuration-file-refresh-period>100</configuration-file-refresh-period> + + <persistence-enabled>false</persistence-enabled> + + <security-enabled>false</security-enabled> + + <journal-type>NIO</journal-type> + + <paging-directory>./target/data/paging</paging-directory> + + <bindings-directory>./target/data/bindings</bindings-directory> + + <journal-directory>./target/data/journal</journal-directory> + + <large-messages-directory>./target/data/large-messages</large-messages-directory> + + + <acceptors> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> + </acceptors> + + <addresses> + <address name="source"/> + <address name="target"/> + </addresses> + + <diverts> + <divert name="source-to-target"> + <address>source</address> + <forwarding-address>target</forwarding-address> + <filter string="x = 'x'" /> + <exclusive>true</exclusive> + </divert> + </diverts> + </core> +</configuration> diff --git a/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml new file mode 100644 index 0000000000..78519664f2 --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml @@ -0,0 +1,64 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>0.0.0.0</name> + + <configuration-file-refresh-period>100</configuration-file-refresh-period> + + <persistence-enabled>false</persistence-enabled> + + <security-enabled>false</security-enabled> + + <journal-type>NIO</journal-type> + + <paging-directory>./target/data/paging</paging-directory> + + <bindings-directory>./target/data/bindings</bindings-directory> + + <journal-directory>./target/data/journal</journal-directory> + + <large-messages-directory>./target/data/large-messages</large-messages-directory> + + + <acceptors> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> + </acceptors> + + <addresses> + <address name="source"/> + <address name="target"/> + </addresses> + + <diverts> + <divert name="source-to-target"> + <address>source</address> + <forwarding-address>target</forwarding-address> + <filter string="x = 'y'" /> + <exclusive>true</exclusive> + </divert> + </diverts> + </core> +</configuration> diff --git a/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml b/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml new file mode 100644 index 0000000000..df5c5a9836 --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml @@ -0,0 +1,63 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>0.0.0.0</name> + + <configuration-file-refresh-period>100</configuration-file-refresh-period> + + <persistence-enabled>false</persistence-enabled> + + <security-enabled>false</security-enabled> + + <journal-type>NIO</journal-type> + + <paging-directory>./target/data/paging</paging-directory> + + <bindings-directory>./target/data/bindings</bindings-directory> + + <journal-directory>./target/data/journal</journal-directory> + + <large-messages-directory>./target/data/large-messages</large-messages-directory> + + + <acceptors> + <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor> + </acceptors> + + <addresses> + <address name="source"/> + <address name="target"/> + </addresses> + + <diverts> + <divert name="source-to-target"> + <address>source</address> + <forwarding-address>target</forwarding-address> + <exclusive>false</exclusive> + </divert> + </diverts> + </core> +</configuration>
