This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c02950db3 ARTEMIS-4510 Add auto-create-destination logic to diverts
6c02950db3 is described below

commit 6c02950db386b0dcb7b9539a0f63f20fa5be00cd
Author: AntonRoskvist <[email protected]>
AuthorDate: Mon Apr 8 11:42:34 2024 +0200

    ARTEMIS-4510 Add auto-create-destination logic to diverts
---
 .../artemis/core/server/impl/DivertImpl.java       |   2 +-
 .../tests/integration/divert/DivertTest.java       | 142 +++++++++++++++++++++
 2 files changed, 143 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 485cc911ba..c499a28d9d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -140,7 +140,7 @@ public class DivertImpl implements Divert {
             copy = message;
          }
 
-         postOffice.route(copy, new 
RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()),
 false);
+         postOffice.route(copy, new 
RoutingContextImpl(context.getTransaction()).setReusable(false).setRoutingType(copy.getRoutingType()).setServerSession(context.getServerSession()),
 false);
       }
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index 355c3c4541..76a00aad29 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import 
org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
@@ -51,6 +52,7 @@ import 
org.apache.activemq.artemis.core.server.ActiveMQServers;
 import 
org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.QueueManagerImpl;
 import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
 import org.apache.activemq.artemis.core.server.transformer.Transformer;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -58,6 +60,7 @@ import 
org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.junit.Assert;
 import org.junit.Test;
@@ -1775,4 +1778,143 @@ public class DivertTest extends ActiveMQTestBase {
       Assert.assertEquals("testAddress" + COUNT, message.getAddress());
       Assert.assertEquals("testAddress" + (COUNT - 1), 
message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
    }
+
+   @Test
+   public void testDivertToNewAddress() throws Exception {
+      final String queueName = "queue";
+      final String dummyQueueName = "dummy";
+      final String noDivertAutoCreateQName = "notAllowed";
+      final String propKey = "newQueue";
+      final String DIVERT = "myDivert";
+      final int numMessages = 10;
+
+      Transformer transformer = message -> 
message.setAddress(message.getStringProperty(propKey));
+
+      ServiceRegistryImpl serviceRegistry = new ServiceRegistryImpl();
+      serviceRegistry.addDivertTransformer(DIVERT, transformer);
+
+      AddressSettings autoCreateDestinationsAS = new 
AddressSettings().setAutoCreateAddresses(true).setAutoCreateQueues(true);
+      AddressSettings noAutoCreateDestinationsAS = new 
AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false);
+
+      ActiveMQServer server = addServer(new 
ActiveMQServerImpl(createDefaultInVMConfig(), null, null, null, 
serviceRegistry));
+
+      server.getConfiguration().addAddressSetting("#", 
autoCreateDestinationsAS);
+      server.getConfiguration().addAddressSetting(noDivertAutoCreateQName, 
noAutoCreateDestinationsAS);
+
+      server.start();
+
+      server.createQueue(new QueueConfiguration(queueName));
+      server.deployDivert(new DivertConfiguration()
+                             .setName(DIVERT)
+                             .setAddress(queueName)
+                             
.setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
+                             .setForwardingAddress(dummyQueueName)
+                             .setExclusive(true));
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession(false, true, true);
+      session.start();
+
+      ClientMessage message;
+      ClientProducer producer = session.createProducer(queueName);
+
+      for (int i = 0; i < numMessages; i++) {
+         message = session.createMessage(true);
+         message.putStringProperty(propKey, queueName + "." + i);
+         producer.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientConsumer consumer = session.createConsumer(queueName + "." + i);
+         message = consumer.receive(DivertTest.TIMEOUT);
+         Assert.assertNotNull(message);
+         message.acknowledge();
+         consumer.close();
+      }
+
+      ClientMessage failMessage = session.createMessage(true);
+
+      assertThrows(ActiveMQAddressDoesNotExistException.class, () -> {
+         failMessage.putStringProperty(propKey, noDivertAutoCreateQName);
+         producer.send(failMessage);
+      });
+
+      producer.close();
+
+      Assert.assertNull(server.locateQueue(noDivertAutoCreateQName));
+      Assert.assertNull(server.locateQueue(dummyQueueName));
+
+   }
+
+   @Test
+   public void testHandleAutoDeleteDestination() throws Exception {
+      final String testAddress = "testAddress";
+      final String forwardAddress = "forwardAddress";
+
+      DivertConfiguration divertConf = new DivertConfiguration()
+         .setName("divert")
+         .setRoutingType(ComponentConfigurationRoutingType.ANYCAST)
+         .setExclusive(true)
+         .setAddress(testAddress)
+         .setForwardingAddress(forwardAddress);
+
+      AddressSettings addressSettings = new AddressSettings()
+         .setAutoCreateAddresses(true)
+         .setAutoCreateQueues(true)
+         .setAutoDeleteAddresses(true)
+         .setAutoDeleteQueues(true);
+
+      Configuration config = 
createDefaultInVMConfig().addDivertConfiguration(divertConf).addAddressSetting("#",
 addressSettings);
+      ActiveMQServer server = 
addServer(ActiveMQServers.newActiveMQServer(config, false));
+      server.start();
+
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(new 
QueueConfiguration(testAddress).setAddress(testAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+      session.createQueue(new 
QueueConfiguration(forwardAddress).setAddress(forwardAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+      session.start();
+
+      ClientProducer producer = session.createProducer(testAddress);
+      ClientConsumer consumer = session.createConsumer(forwardAddress);
+
+      final int numMessages = 5;
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session.createMessage(true);
+         message.setRoutingType(RoutingType.ANYCAST);
+         producer.send(message);
+      }
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = consumer.receive(DivertTest.TIMEOUT);
+         Assert.assertNotNull(message);
+         message.acknowledge();
+      }
+
+      Assert.assertNull(consumer.receiveImmediate());
+      consumer.close();
+
+      //Trigger autoDelete instead of waiting
+      QueueManagerImpl.performAutoDeleteQueue(server, 
server.locateQueue(forwardAddress));
+      Wait.assertTrue(() -> 
server.getPostOffice().getAddressInfo(SimpleString.toSimpleString(forwardAddress))
+         .getBindingRemovedTimestamp() != -1, DivertTest.TIMEOUT, 100);
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session.createMessage(true);
+         producer.send(message);
+      }
+
+      consumer = session.createConsumer(forwardAddress);
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = consumer.receive(DivertTest.TIMEOUT);
+         Assert.assertNotNull(message);
+         message.acknowledge();
+      }
+
+      Assert.assertNull(consumer.receiveImmediate());
+   }
+
 }

Reply via email to