This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit b471392872cf15c2a29f2b3c4e9a9ca4a9586e49
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Nov 11 07:24:15 2022 -0500

    ARTEMIS-4089 Check on AutoCreation during routing
    
    (cherry picked from commit 4f79eb42f53071e3606a4848ec72dd164a88e350)
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  48 +---
 .../core/postoffice/impl/PostOfficeImpl.java       |  94 +++++--
 .../artemis/core/server/RoutingContext.java        |   4 +
 .../artemis/core/server/ServerSession.java         |   2 +
 .../core/server/impl/RoutingContextImpl.java       |  30 ++-
 .../core/server/impl/ServerSessionImpl.java        |  57 +++-
 .../integration/amqp/AmqpExpiredMessageTest.java   |   9 +
 .../integration/client/DeleteAddressTest.java      | 291 +++++++++++++++++++++
 .../tests/integration/client/LargeMessageTest.java |   3 +-
 .../integration/management/AddressControlTest.java |   1 +
 10 files changed, 465 insertions(+), 74 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index efa25c89cb..637cdf10f2 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -20,7 +20,6 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
-import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@@ -35,7 +34,6 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
@@ -47,7 +45,6 @@ import org.apache.activemq.artemis.core.server.ServerProducer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
@@ -332,45 +329,8 @@ public class AMQPSessionCallback implements 
SessionCallback {
    }
 
 
-
    public boolean checkAddressAndAutocreateIfPossible(SimpleString address, 
RoutingType routingType) throws Exception {
-      boolean result = false;
-      SimpleString unPrefixedAddress = serverSession.removePrefix(address);
-      AddressSettings addressSettings = 
manager.getServer().getAddressSettingsRepository().getMatch(unPrefixedAddress.toString());
-
-      if (routingType == RoutingType.MULTICAST) {
-         if (manager.getServer().getAddressInfo(unPrefixedAddress) == null) {
-            if (addressSettings.isAutoCreateAddresses()) {
-               try {
-                  serverSession.createAddress(address, routingType, true);
-               } catch (ActiveMQAddressExistsException e) {
-                  // The address may have been created by another thread in 
the mean time.  Catch and do nothing.
-               }
-               result = true;
-            }
-         } else {
-            result = true;
-         }
-      } else if (routingType == RoutingType.ANYCAST) {
-         if (manager.getServer().locateQueue(unPrefixedAddress) == null) {
-            Bindings bindings = 
manager.getServer().getPostOffice().lookupBindingsForAddress(address);
-            if (bindings != null) {
-               // this means the address has another queue with a different 
name, which is fine, we just ignore it on this case
-               result = true;
-            } else if (addressSettings.isAutoCreateQueues()) {
-               try {
-                  serverSession.createQueue(new 
QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true));
-               } catch (ActiveMQQueueExistsException e) {
-                  // The queue may have been created by another thread in the 
mean time.  Catch and do nothing.
-               }
-               result = true;
-            }
-         } else {
-            result = true;
-         }
-      }
-
-      return result;
+      return serverSession.checkAutoCreate(address, routingType);
    }
 
    public AddressQueryResult addressQuery(SimpleString addressName,
@@ -506,7 +466,11 @@ public class AMQPSessionCallback implements 
SessionCallback {
 
       //here check queue-autocreation
       if (!checkAddressAndAutocreateIfPossible(address, routingType)) {
-         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
+         ActiveMQException e = 
ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
+         if (transaction != null) {
+            transaction.markAsRollbackOnly(e);
+         }
+         throw e;
       }
 
       OperationContext oldcontext = recoverContext();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 5949ad8448..b9c60f1fb3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -38,6 +38,7 @@ import java.util.stream.Stream;
 import 
org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
 import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -1154,7 +1155,9 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       }
       message.clearInternalProperties();
       Bindings bindings;
-      final AddressInfo addressInfo = addressManager.getAddressInfo(address);
+      final AddressInfo addressInfo = checkAddress(context, address);
+
+      final RoutingStatus status;
       if (bindingMove != null) {
          context.clear();
          context.setReusable(false);
@@ -1162,18 +1165,28 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
          if (addressInfo != null) {
             addressInfo.incrementRoutedMessageCount();
          }
-      } else if ((bindings = 
addressManager.getBindingsForRoutingAddress(address)) != null) {
-         bindings.route(message, context);
-         if (addressInfo != null) {
-            addressInfo.incrementRoutedMessageCount();
-         }
+         status = RoutingStatus.OK;
       } else {
-         context.setReusable(false);
-         if (addressInfo != null) {
-            addressInfo.incrementUnRoutedMessageCount();
+         bindings = simpleRoute(address, context, message, addressInfo);
+         if (logger.isDebugEnabled()) {
+            if (bindings != null) {
+               logger.debug("PostOffice::simpleRoute returned bindings with 
size = {}", bindings.getBindings().size());
+            } else {
+               logger.debug("PostOffice::simpleRoute null as bindings");
+            }
+         }
+         if (bindings == null) {
+            context.setReusable(false);
+            context.clear();
+            if (addressInfo != null) {
+               addressInfo.incrementUnRoutedMessageCount();
+            }
+            // this is a debug and not warn because this could be a regular 
scenario on publish-subscribe queues (or topic subscriptions on JMS)
+            logger.debug("Couldn't find any bindings for address={} on 
message={}", address, message);
+            status = RoutingStatus.NO_BINDINGS;
+         } else {
+            status = RoutingStatus.OK;
          }
-         // this is a debug and not warn because this could be a regular 
scenario on publish-subscribe queues (or topic subscriptions on JMS)
-         logger.debug("Couldn't find any bindings for address={} on 
message={}", address, message);
       }
 
       if (server.hasBrokerMessagePlugins()) {
@@ -1182,14 +1195,20 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
 
       logger.trace("Message after routed={}\n{}", message, context);
 
+      final RoutingStatus finalStatus;
       try {
-         final RoutingStatus status;
-         if (context.getQueueCount() == 0) {
-            status = maybeSendToDLA(message, context, address, sendToDLA);
+         if ( status == RoutingStatus.NO_BINDINGS) {
+            finalStatus = maybeSendToDLA(message, context, address, sendToDLA);
          } else {
-            status = RoutingStatus.OK;
+            finalStatus = status;
             try {
-               processRoute(message, context, direct);
+               if (context.getQueueCount() > 0) {
+                  processRoute(message, context, direct);
+               } else {
+                  if (message.isLargeMessage()) {
+                     ((LargeServerMessage) message).deleteFile();
+                  }
+               }
             } catch (ActiveMQAddressFullException e) {
                if (startedTX) {
                   context.getTransaction().rollback();
@@ -1203,9 +1222,9 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
             context.getTransaction().commit();
          }
          if (server.hasBrokerMessagePlugins()) {
-            server.callBrokerMessagePlugins(plugin -> 
plugin.afterMessageRoute(message, context, direct, rejectDuplicates, status));
+            server.callBrokerMessagePlugins(plugin -> 
plugin.afterMessageRoute(message, context, direct, rejectDuplicates, 
finalStatus));
          }
-         return status;
+         return finalStatus;
       } catch (Exception e) {
          if (server.hasBrokerMessagePlugins()) {
             server.callBrokerMessagePlugins(plugin -> 
plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e));
@@ -1214,6 +1233,45 @@ public class PostOfficeImpl implements PostOffice, 
NotificationListener, Binding
       }
    }
 
+   private AddressInfo checkAddress(RoutingContext context, SimpleString 
address) throws Exception {
+      AddressInfo addressInfo = addressManager.getAddressInfo(address);
+      if (addressInfo == null && context.getServerSession() != null) {
+         if (context.getServerSession().checkAutoCreate(address, 
context.getRoutingType())) {
+            addressInfo = addressManager.getAddressInfo(address);
+         } else {
+            ActiveMQException ex = 
ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
+            if (context.getTransaction() != null) {
+               context.getTransaction().markAsRollbackOnly(ex);
+            }
+            throw ex;
+         }
+      }
+      return addressInfo;
+   }
+
+   Bindings simpleRoute(SimpleString address, RoutingContext context, Message 
message, AddressInfo addressInfo) throws Exception {
+      Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
+      if (bindings == null && context.getServerSession() != null) {
+         if (!context.getServerSession().checkAutoCreate(address, 
context.getRoutingType())) {
+            ActiveMQException e = 
ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
+            Transaction tx = context.getTransaction();
+            if (tx != null) {
+               tx.markAsRollbackOnly(e);
+            }
+            throw e;
+         }
+         bindings = addressManager.getBindingsForRoutingAddress(address);
+      }
+      if (bindings != null) {
+         bindings.route(message, context);
+         if (addressInfo != null) {
+            addressInfo.incrementRoutedMessageCount();
+         }
+      }
+      return bindings;
+   }
+
+
    private RoutingStatus maybeSendToDLA(final Message message,
                                         final RoutingContext context,
                                         final SimpleString address,
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index f6cff3d1be..d95c7ae9d8 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -103,5 +103,9 @@ public interface RoutingContext {
 
    MessageLoadBalancingType getLoadBalancingType();
 
+   RoutingContext setServerSession(ServerSession session);
+
+   ServerSession getServerSession();
+
 
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index eff638f1e0..f238a4925d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -110,6 +110,8 @@ public interface ServerSession extends SecurityAuth {
 
    void addCloseable(Closeable closeable);
 
+   boolean checkAutoCreate(SimpleString address, RoutingType routingType) 
throws Exception;
+
    ServerConsumer createConsumer(long consumerID,
                                  SimpleString queueName,
                                  SimpleString filterString,
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index c047171154..1220190d12 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -30,6 +29,7 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.ServerSession;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -65,10 +65,10 @@ public class RoutingContextImpl implements RoutingContext {
 
    boolean mirrorDisabled = false;
 
-   private final Executor executor;
-
    private boolean duplicateDetection = true;
 
+   private ServerSession serverSession;
+
    @Override
    public boolean isDuplicateDetection() {
       return duplicateDetection;
@@ -81,12 +81,7 @@ public class RoutingContextImpl implements RoutingContext {
    }
 
    public RoutingContextImpl(final Transaction transaction) {
-      this(transaction, null);
-   }
-
-   public RoutingContextImpl(final Transaction transaction, Executor executor) 
{
       this.transaction = transaction;
-      this.executor = executor;
    }
 
    @Override
@@ -121,7 +116,7 @@ public class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
-   public RoutingContext setReusable(boolean reusable) {
+   public RoutingContextImpl setReusable(boolean reusable) {
       if (this.reusable != null && !this.reusable.booleanValue()) {
          // cannot set to Reusable once it was set to false
          return this;
@@ -131,7 +126,7 @@ public class RoutingContextImpl implements RoutingContext {
       return this;
    }
    @Override
-   public RoutingContext setReusable(boolean reusable, int previousBindings) {
+   public RoutingContextImpl setReusable(boolean reusable, int 
previousBindings) {
       this.version = previousBindings;
       this.previousAddress = address;
       this.previousRoutingType = routingType;
@@ -144,7 +139,7 @@ public class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
-   public RoutingContext clear() {
+   public RoutingContextImpl clear() {
       map.clear();
 
       queueCount = 0;
@@ -252,7 +247,7 @@ public class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
-   public RoutingContext setRoutingType(RoutingType routingType) {
+   public RoutingContextImpl setRoutingType(RoutingType routingType) {
       if (this.routingType == null && routingType != null || this.routingType 
!= routingType) {
          this.clear();
       }
@@ -313,6 +308,17 @@ public class RoutingContextImpl implements RoutingContext {
       return getContextListing(address).getDurableQueues();
    }
 
+   @Override
+   public RoutingContextImpl setServerSession(ServerSession session) {
+      this.serverSession = session;
+      return this;
+   }
+
+   @Override
+   public ServerSession getServerSession() {
+      return serverSession;
+   }
+
    @Override
    public int getQueueCount() {
       return queueCount;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 67201d3c7e..ab77a89f61 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.json.JsonArrayBuilder;
 import org.apache.activemq.artemis.json.JsonObjectBuilder;
 import java.security.cert.X509Certificate;
@@ -169,7 +172,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    private final SimpleString managementAddress;
 
-   protected final RoutingContext routingContext = new 
RoutingContextImpl(null);
+   protected final RoutingContext routingContext = new 
RoutingContextImpl(null).setServerSession(this);
 
    protected final SessionCallback callback;
 
@@ -1737,6 +1740,55 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
       return tx;
    }
 
+
+   @Override
+   public boolean checkAutoCreate(SimpleString address, RoutingType 
routingType) throws Exception {
+      boolean result;
+      SimpleString unPrefixedAddress = removePrefix(address);
+      AddressSettings addressSettings =  
server.getAddressSettingsRepository().getMatch(unPrefixedAddress.toString());
+
+      if (routingType == RoutingType.MULTICAST) {
+         if (server.getAddressInfo(unPrefixedAddress) == null) {
+            if (addressSettings.isAutoCreateAddresses()) {
+               try {
+                  createAddress(address, routingType, true);
+               } catch (ActiveMQAddressExistsException e) {
+                  // The address may have been created by another thread in 
the mean time.  Catch and do nothing.
+               }
+               result = true;
+            } else {
+               result = false;
+            }
+         } else {
+            result = true;
+         }
+      } else if (routingType == RoutingType.ANYCAST) {
+         if (server.locateQueue(unPrefixedAddress) == null) {
+            Bindings bindings = 
server.getPostOffice().lookupBindingsForAddress(address);
+            if (bindings != null) {
+               // this means the address has another queue with a different 
name, which is fine, we just ignore it on this case
+               result = true;
+            } else if (addressSettings.isAutoCreateQueues()) {
+               try {
+                  createQueue(new 
QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true));
+               } catch (ActiveMQQueueExistsException e) {
+                  // The queue may have been created by another thread in the 
mean time.  Catch and do nothing.
+               }
+               result = true;
+            } else {
+               result = false;
+            }
+         } else {
+            result = true;
+         }
+      } else {
+         result = true;
+      }
+
+      return result;
+   }
+
+
    @Override
    public RoutingStatus send(final Message message, final boolean direct) 
throws Exception {
       return send(message, direct, false);
@@ -2218,6 +2270,8 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
          result = postOffice.route(msg, routingContext, direct);
 
+         logger.debug("Routing result for {} = {}", msg, result);
+
          Pair<Object, AtomicLong> value = 
targetAddressInfos.get(msg.getAddressSimpleString());
 
          if (value == null) {
@@ -2231,6 +2285,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
             routingContext.clear();
          }
       }
+
       return result;
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
index f98ea8067c..71f397aad5 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
@@ -23,6 +23,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -49,9 +50,13 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
 
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    @Test(timeout = 60000)
    public void testSendMessageThatIsAlreadyExpiredUsingAbsoluteTime() throws 
Exception {
       AmqpClient client = createAmqpClient();
@@ -568,7 +573,11 @@ public class AmqpExpiredMessageTest extends 
AmqpClientTestSupport {
          message.setText("Test-Message");
          message.setDeliveryAnnotation("shouldDisappear", 1);
          message.setMessageAnnotation("x-opt-routing-type", (byte) 1);
+
+         
logger.debug("*******************************************************************************************************************************");
+         logger.debug("message being sent {}", message);
          sender.send(message);
+         
logger.debug("*******************************************************************************************************************************");
 
          Queue forward = getProxyToQueue(FORWARDING_ADDRESS);
          assertTrue("Message not diverted", Wait.waitFor(() -> 
forward.getMessageCount() > 0, 7000, 500));
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeleteAddressTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeleteAddressTest.java
new file mode 100644
index 0000000000..dece936305
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeleteAddressTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeleteAddressTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   ActiveMQServer server;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   private void localServer(boolean autoCreate) throws Exception {
+      server = createServer(false, true);
+
+      AddressSettings settings = new 
AddressSettings().setAutoDeleteAddresses(autoCreate).setAutoCreateAddresses(autoCreate).setAutoCreateQueues(autoCreate).setAutoDeleteQueues(autoCreate).setDeadLetterAddress(SimpleString.toSimpleString("DLQ")).setSendToDLAOnNoRoute(true);
+      server.start();
+      server.createQueue(new 
QueueConfiguration("DLQ").setRoutingType(RoutingType.ANYCAST));
+      server.getAddressSettingsRepository().addMatch(getName() + "*", 
settings);
+   }
+
+   @Test
+   public void testQueueNoAutoCreateCore() throws Exception {
+      internalQueueTest("CORE", false);
+   }
+
+   @Test
+   public void testQueueNoAutoCreateAMQP() throws Exception {
+      internalQueueTest("AMQP", false);
+   }
+
+   @Test
+   public void testQueueNoAutoCreateOpenWire() throws Exception {
+      internalQueueTest("OPENWIRE", false);
+   }
+
+
+   @Test
+   public void testQueueAutoCreateCore() throws Exception {
+      internalQueueTest("CORE", true);
+   }
+
+   @Test
+   public void testDeletoAutoCreateAMQP() throws Exception {
+      internalQueueTest("AMQP", true);
+   }
+
+   @Test
+   public void testQueueAutoCreateOpenWire() throws Exception {
+      internalQueueTest("OPENWIRE", true);
+   }
+
+   public void internalQueueTest(String protocol, boolean autocreate) throws 
Exception {
+      localServer(autocreate);
+
+      String ADDRESS_NAME = getName() + protocol;
+
+      if (!autocreate) {
+         server.addAddressInfo(new 
AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.ANYCAST));
+         server.createQueue(new 
QueueConfiguration(ADDRESS_NAME).setRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      }
+
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(ADDRESS_NAME);
+         MessageProducer producer = session.createProducer(queue);
+         producer.send(session.createTextMessage("hello"));
+         session.commit();
+         connection.start();
+
+         try (MessageConsumer consumer = session.createConsumer(queue)) {
+            logger.debug("Sending hello message");
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals("hello", message.getText());
+         }
+
+         session.commit();
+
+         org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.locateQueue(ADDRESS_NAME);
+         Wait.assertEquals(0, serverQueue::getConsumerCount);
+
+         server.destroyQueue(SimpleString.toSimpleString(ADDRESS_NAME));
+
+         boolean exception = false;
+         try {
+            logger.debug("Sending good bye message");
+            producer.send(session.createTextMessage("good bye"));
+            session.commit();
+            logger.debug("Exception was not captured, sent went fine");
+         } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+            exception = true;
+         }
+
+         if (!autocreate) {
+            Assert.assertTrue(exception);
+         }
+
+         if (autocreate) {
+            logger.debug("creating consumer");
+            try (MessageConsumer consumer = session.createConsumer(queue)) {
+               TextMessage message = (TextMessage) consumer.receive(5000);
+               Assert.assertNotNull(message);
+               Assert.assertEquals("good bye", message.getText());
+            }
+         } else {
+            exception = false;
+            logger.debug("Creating consumer, where an exception is expected");
+            try (MessageConsumer consumer = session.createConsumer(queue)) {
+            } catch (Exception e) {
+               logger.debug("Received exception after createConsumer");
+               exception = true;
+            }
+            Assert.assertTrue(exception);
+         }
+      }
+
+      org.apache.activemq.artemis.core.server.Queue dlqServerQueue = 
server.locateQueue("DLQ");
+      Assert.assertEquals(0, dlqServerQueue.getMessageCount());
+   }
+
+   @Test
+   public void testTopicNoAutoCreateCore() throws Exception {
+      internalMulticastTest("CORE", false);
+   }
+
+   @Test
+   public void testTopicAutoCreateCore() throws Exception {
+      internalMulticastTest("CORE", true);
+   }
+
+   @Test
+   public void testTopicNoAutoCreateAMQP() throws Exception {
+      internalMulticastTest("AMQP", false);
+   }
+
+   @Test
+   public void testTopicAutoCreateAMQP() throws Exception {
+      internalMulticastTest("AMQP", true);
+   }
+
+   @Test
+   public void testTopicNoAutoCreateOPENWIRE() throws Exception {
+      internalMulticastTest("OPENWIRE", false);
+   }
+
+   @Test
+   public void testTopicAutoCreateOPENWIRE() throws Exception {
+      internalMulticastTest("OPENWIRE", true);
+   }
+
+   public void internalMulticastTest(String protocol, boolean autocreate) 
throws Exception {
+      localServer(autocreate);
+
+      String ADDRESS_NAME = getName() + protocol + "_Topic";
+      final String dlqText = "This should be in DLQ " + 
RandomUtil.randomString();
+
+      if (!autocreate) {
+         server.addAddressInfo(new 
AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST));
+      }
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         connection.setClientID("client");
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic destination = session.createTopic(ADDRESS_NAME);
+
+         TopicSubscriber consumer = 
session.createDurableSubscriber(destination, "subs1");
+
+         MessageProducer producer = session.createProducer(destination);
+         producer.send(session.createTextMessage("hello"));
+         session.commit();
+         connection.start();
+
+         logger.debug("Sending hello message");
+         TextMessage message = (TextMessage) consumer.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals("hello", message.getText());
+
+         consumer.close();
+
+         session.commit();
+
+         Bindings bindings = 
server.getPostOffice().lookupBindingsForAddress(SimpleString.toSimpleString(ADDRESS_NAME));
+         for (Binding b : bindings.getBindings()) {
+            if (b instanceof LocalQueueBinding) {
+               Wait.assertEquals(0, () -> 
((LocalQueueBinding)b).getQueue().getConsumerCount());
+               server.destroyQueue(b.getUniqueName());
+            }
+         }
+
+         producer.send(session.createTextMessage(dlqText));
+         session.commit();
+
+         server.removeAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME), 
null);
+
+         try {
+            logger.debug("Sending good bye message");
+            producer.send(session.createTextMessage("good bye"));
+            logger.debug("Exception was not captured, sent went fine");
+            if (!autocreate) {
+               session.commit();
+               Assert.fail("Exception was expected");
+            } else {
+               session.rollback();
+            }
+         } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+         }
+
+         logger.debug("creating consumer");
+         try (TopicSubscriber newSubs = 
session.createDurableSubscriber(destination, "second")) {
+            if (!autocreate) {
+               Assert.fail("exception was expected");
+            }
+         } catch (Exception expected) {
+            logger.debug(expected.getMessage(), expected);
+         }
+
+         org.apache.activemq.artemis.core.server.Queue dlqServerQueue = 
server.locateQueue("DLQ");
+         Assert.assertEquals(1, dlqServerQueue.getMessageCount());
+      }
+
+      try (Connection connection = factory.createConnection()) {
+         connection.setClientID("client");
+         connection.start();
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+
+         MessageConsumer dlqConsumer = 
session.createConsumer(session.createQueue("DLQ"));
+         TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000);
+         Assert.assertNotNull(dlqMessage);
+         Assert.assertEquals(dlqText, dlqMessage.getText());
+         Assert.assertNull(dlqConsumer.receiveNoWait());
+      }
+
+
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index b1e96713c5..5914d62b3a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -272,6 +272,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
 
       Message clientFile = createLargeClientMessageStreaming(session, 
messageSize, true);
 
+      logger.debug("****** Send message");
       producer.send(clientFile);
 
       session.commit();
@@ -292,7 +293,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
          msg1.getBodyBuffer().readByte();
          Assert.fail("Exception was expected");
       } catch (final Exception ignored) {
-         // empty on purpose
+         logger.debug(ignored.getMessage(), ignored);
       }
 
       session.close();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
index 222061123b..576d5499ba 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
@@ -636,6 +636,7 @@ public class AddressControlTest extends ManagementTestBase {
       session.createAddress(address, RoutingType.ANYCAST, false);
 
       AddressControl addressControl = createManagementControl(address);
+      Assert.assertNotNull(addressControl);
       assertEquals(0, addressControl.getMessageCount());
 
       ClientProducer producer = session.createProducer(address.toString());


Reply via email to