This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 6c02950db3 ARTEMIS-4510 Add auto-create-destination logic to diverts
6c02950db3 is described below
commit 6c02950db386b0dcb7b9539a0f63f20fa5be00cd
Author: AntonRoskvist <[email protected]>
AuthorDate: Mon Apr 8 11:42:34 2024 +0200
ARTEMIS-4510 Add auto-create-destination logic to diverts
---
.../artemis/core/server/impl/DivertImpl.java | 2 +-
.../tests/integration/divert/DivertTest.java | 142 +++++++++++++++++++++
2 files changed, 143 insertions(+), 1 deletion(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 485cc911ba..c499a28d9d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -140,7 +140,7 @@ public class DivertImpl implements Divert {
copy = message;
}
- postOffice.route(copy, new
RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()),
false);
+ postOffice.route(copy, new
RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()).setServerSession(context.getServerSession()),
false);
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index 355c3c4541..76a00aad29 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import
org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@@ -51,6 +52,7 @@ import
org.apache.activemq.artemis.core.server.ActiveMQServers;
import
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -58,6 +60,7 @@ import
org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Assert;
import org.junit.Test;
@@ -1775,4 +1778,143 @@ public class DivertTest extends ActiveMQTestBase {
Assert.assertEquals("testAddress" + COUNT, message.getAddress());
Assert.assertEquals("testAddress" + (COUNT - 1),
message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
}
+
+ @Test
+ public void testDivertToNewAddress() throws Exception {
+ final String queueName = "queue";
+ final String dummyQueueName = "dummy";
+ final String noDivertAutoCreateQName = "notAllowed";
+ final String propKey = "newQueue";
+ final String DIVERT = "myDivert";
+ final int numMessages = 10;
+
+ Transformer transformer = message ->
message.setAddress(message.getStringProperty(propKey));
+
+ ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl();
+ serviceRegistry.addDivertTransformer(DIVERT, transformer);
+
+ AddressSettings autoCreateDestinationsAS = new
AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(true);
+ AddressSettings noAutoCreateDestinationsAS = new
AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false);
+
+ ActiveMQServer server = addServer(new
ActiveMQServerImpl(createDefaultInVMConfig(), null, null, null,
serviceRegistry));
+
+ server.getConfiguration().addAddressSetting("#",
autoCreateDestinationsAS);
+ server.getConfiguration().addAddressSetting(noDivertAutoCreateQName,
noAutoCreateDestinationsAS);
+
+ server.start();
+
+ server.createQueue(new QueueConfiguration(queueName));
+ server.deployDivert(new DivertConfiguration()
+ .setName(DIVERT)
+ .setAddress(queueName)
+
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
+ .setForwardingAddress(dummyQueueName)
+ .setExclusive(true));
+
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession(false, true, true);
+ session.start();
+
+ ClientMessage message;
+ ClientProducer producer = session.createProducer(queueName);
+
+ for (int i = 0; i < numMessages; i++) {
+ message = session.createMessage(true);
+ message.putStringProperty(propKey, queueName + "." + i);
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientConsumer consumer = session.createConsumer(queueName + "." + i);
+ message = consumer.receive(DivertTest.TIMEOUT);
+ Assert.assertNotNull(message);
+ message.acknowledge();
+ consumer.close();
+ }
+
+ ClientMessage failMessage = session.createMessage(true);
+
+ assertThrows(ActiveMQAddressDoesNotExistException.class, () -> {
+ failMessage.putStringProperty(propKey, noDivertAutoCreateQName);
+ producer.send(failMessage);
+ });
+
+ producer.close();
+
+ Assert.assertNull(server.locateQueue(noDivertAutoCreateQName));
+ Assert.assertNull(server.locateQueue(dummyQueueName));
+
+ }
+
+ @Test
+ public void testHandleAutoDeleteDestination() throws Exception {
+ final String testAddress = "testAddress";
+ final String forwardAddress = "forwardAddress";
+
+ DivertConfiguration divertConf = new DivertConfiguration()
+ .setName("divert")
+ .setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
+ .setExclusive(true)
+ .setAddress(testAddress)
+ .setForwardingAddress(forwardAddress);
+
+ AddressSettings addressSettings = new AddressSettings()
+ .setAutoCreateAddresses(true)
+ .setAutoCreateQueues(true)
+ .setAutoDeleteAddresses(true)
+ .setAutoDeleteQueues(true);
+
+ Configuration config =
createDefaultInVMConfig().addDivertConfiguration(divertConf).addAddressSetting("#",
addressSettings);
+ ActiveMQServer server =
addServer(ActiveMQServers.newActiveMQServer(config, false));
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(new
QueueConfiguration(testAddress).setAddress(testAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+ session.createQueue(new
QueueConfiguration(forwardAddress).setAddress(forwardAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+ session.start();
+
+ ClientProducer producer = session.createProducer(testAddress);
+ ClientConsumer consumer = session.createConsumer(forwardAddress);
+
+ final int numMessages = 5;
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = session.createMessage(true);
+ message.setRoutingType(RoutingType.ANYCAST);
+ producer.send(message);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = consumer.receive(DivertTest.TIMEOUT);
+ Assert.assertNotNull(message);
+ message.acknowledge();
+ }
+
+ Assert.assertNull(consumer.receiveImmediate());
+ consumer.close();
+
+ //Trigger autoDelete instead of waiting
+ QueueManagerImpl.performAutoDeleteQueue(server,
server.locateQueue(forwardAddress));
+ Wait.assertTrue(() ->
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(forwardAddress))
+ .getBindingRemovedTimestamp() != -1, DivertTest.TIMEOUT, 100);
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = session.createMessage(true);
+ producer.send(message);
+ }
+
+ consumer = session.createConsumer(forwardAddress);
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage message = consumer.receive(DivertTest.TIMEOUT);
+ Assert.assertNotNull(message);
+ message.acknowledge();
+ }
+
+ Assert.assertNull(consumer.receiveImmediate());
+ }
+
}