This is an automated email from the ASF dual-hosted git repository.

brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 8dfe0a8d99 ARTEMIS-5337 consumer + auto-create receives from existing 
wildcard queue
8dfe0a8d99 is described below

commit 8dfe0a8d99811c270f46f7f077f08ca0321b6927
Author: Justin Bertram <[email protected]>
AuthorDate: Fri Mar 7 15:07:22 2025 -0600

    ARTEMIS-5337 consumer + auto-create receives from existing wildcard queue
    
    Most protocols don't natively understand the address/queue model used by
    Artemis. When those protocols want anycast semantics the standard
    behavior is to create an address and a queue with the same name.
    However, sometimes a queue already exists on the address with that
    particular name in which case the broker will attach the consumer to
    that queue.
    
    This is acceptable most of the time, but it is problematic when that
    queue uses a wild-card. In this case the consumer will potentially
    receive messages sent to other addresses (i.e. whatever the wild-card
    covers).
    
    This commit fixes that by detecting the wild-card queue and ignoring it.
    Instead the broker will create a new queue specifically for the incoming
    consumer.
---
 .../core/protocol/openwire/OpenWireConnection.java |  7 +-
 .../artemis/core/config/WildcardConfiguration.java | 27 +++++++-
 .../core/postoffice/impl/SimpleAddressManager.java |  2 +-
 .../core/server/impl/ServerSessionImpl.java        |  6 +-
 .../impl/HierarchicalObjectRepository.java         | 10 +--
 .../core/config/WildcardConfigurationTest.java     | 32 ++++++++-
 .../jms/multiprotocol/JMSMessageConsumerTest.java  | 78 ++++++++++++++++++++++
 7 files changed, 144 insertions(+), 18 deletions(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 5446dedfe3..cd46df13bc 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -70,6 +70,7 @@ import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.TempQueueObserver;
 import org.apache.activemq.artemis.core.server.impl.RefsOperation;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import 
org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
@@ -743,7 +744,6 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
    @Override
    public void fail(ActiveMQException me, String message) {
-
       final ThresholdActor<Command> localVisibleActor = openWireActor;
       if (localVisibleActor != null) {
          localVisibleActor.requestShutdown();
@@ -1022,7 +1022,10 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
    }
 
    public void addKnownDestination(final SimpleString address) {
-      knownDestinations.add(address);
+      AddressSettings addressSettings = 
server.getAddressSettingsRepository().getMatch(address.toString());
+      if (!addressSettings.isAutoDeleteAddresses() && 
!addressSettings.isAutoDeleteQueues()) {
+         knownDestinations.add(address);
+      }
    }
 
    public boolean containsKnownDestination(final SimpleString address) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
index f99734e60a..e43eeb749f 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
@@ -178,6 +178,26 @@ public class WildcardConfiguration implements Serializable 
{
       }
    }
 
+   /**
+    * Detect whether the input {@code CharSequence} contains any unescaped 
"single word" or "any words" characters.
+    *
+    * {@code CharSequence} is used here to support both {@code String} and 
{@code SimpleString} objects.
+    */
+   public boolean isWild(CharSequence input) {
+      if (input == null || input.isEmpty()) {
+         return false;
+      } else if (input.charAt(0) == getSingleWord() || input.charAt(0) == 
getAnyWords()) {
+         return true;
+      } else {
+         for (int i = 1; i < input.length(); i++) {
+            if ((input.charAt(i) == getSingleWord() || input.charAt(i) == 
getAnyWords()) && input.charAt(i - 1) != ESCAPE) {
+               return true;
+            }
+         }
+      }
+      return false;
+   }
+
    private String escape(final String input, WildcardConfiguration from) {
       String result = input.replace(escapeString, escapeString + escapeString);
       if (delimiter != from.getDelimiter()) {
@@ -200,7 +220,12 @@ public class WildcardConfiguration implements Serializable 
{
          .replace(ESCAPE + getAnyWordsString(), getAnyWordsString());
    }
 
-   private boolean isEscaped(final String input) {
+   /**
+    * {@return whether the input contains any escaped characters}
+    *
+    * @param input the {@code CharSequence} to inspect
+    */
+   private boolean isEscaped(final CharSequence input) {
       for (int i = 0; i < input.length() - 1; i++) {
          if (input.charAt(i) == ESCAPE && (input.charAt(i + 1) == 
getDelimiter() || input.charAt(i + 1) == getSingleWord() || input.charAt(i + 1) 
== getAnyWords())) {
             return true;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
index dfa2c798ed..7e57ca4b48 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/SimpleAddressManager.java
@@ -198,7 +198,7 @@ public class SimpleAddressManager implements AddressManager 
{
          Bindings bindings = mappings.get(realAddress);
          if (bindings != null) {
             for (Binding theBinding : bindings.getBindings()) {
-               if (theBinding instanceof LocalQueueBinding) {
+               if (theBinding instanceof LocalQueueBinding && 
!wildcardConfiguration.isWild(theBinding.getUniqueName())) {
                   binding = theBinding;
                   break;
                }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 3c071da721..68516028a7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -61,7 +61,6 @@ import 
org.apache.activemq.artemis.core.persistence.StorageManager;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
@@ -1833,9 +1832,8 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
          Queue q = server.locateQueue(unPrefixedQueue);
          if (q == null) {
             // The queue doesn't exist.
-            Bindings bindings = 
server.getPostOffice().lookupBindingsForAddress(unPrefixedAddress);
-            if (bindings != null && bindings.hasLocalBinding() && 
!queueConfig.isFqqn()) {
-               // The address has another queue with a different name, which 
is fine. Just ignore it.
+            if (!queueConfig.isFqqn() && 
server.getPostOffice().getMatchingQueue(unPrefixedAddress, 
queueConfig.getRoutingType()) != null) {
+               // The address has a local, non-wildcard queue with a different 
name, which is fine. Just ignore it.
                result = AutoCreateResult.EXISTED;
             } else if (addressSettings.isAutoCreateQueues() || 
queueConfig.isTemporary()) {
                // Try to create the queue.
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/HierarchicalObjectRepository.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/HierarchicalObjectRepository.java
index f483dd21c4..07fdaeb101 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/HierarchicalObjectRepository.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/HierarchicalObjectRepository.java
@@ -196,7 +196,7 @@ public class HierarchicalObjectRepository<T> implements 
HierarchicalRepository<T
       lock.writeLock().lock();
       try {
          // an exact match (i.e. one without wildcards) won't impact any other 
matches so no need to clear the cache
-         if (usesWildcards(modifiedMatch)) {
+         if (wildcardConfiguration.isWild(modifiedMatch)) {
             clearCache();
          } else if (modifiedMatch != null && cache.containsKey(modifiedMatch)) 
{
             cache.remove(modifiedMatch);
@@ -209,7 +209,7 @@ public class HierarchicalObjectRepository<T> implements 
HierarchicalRepository<T
          Match<T> match1 = new Match<>(modifiedMatch, value, 
wildcardConfiguration, literal);
          if (literal) {
             literalMatches.put(modifiedMatch, match1);
-         } else if (usesWildcards(modifiedMatch)) {
+         } else if (wildcardConfiguration.isWild(modifiedMatch)) {
             wildcardMatches.put(modifiedMatch, match1);
          } else {
             exactMatches.put(modifiedMatch, match1);
@@ -224,10 +224,6 @@ public class HierarchicalObjectRepository<T> implements 
HierarchicalRepository<T
       }
    }
 
-   private boolean usesWildcards(String modifiedMatch) {
-      return modifiedMatch == null ? false : 
(modifiedMatch.contains(wildcardConfiguration.getAnyWordsString()) || 
modifiedMatch.contains(wildcardConfiguration.getSingleWordString()));
-   }
-
    @Override
    public int getCacheSize() {
       return cache.size();
@@ -306,7 +302,7 @@ public class HierarchicalObjectRepository<T> implements 
HierarchicalRepository<T
              * Clear the cache before removing the match, but only if the 
match used wildcards. This will force any
              * thread at {@link #getMatch(String)} to get the lock to 
recompute.
              */
-            if (usesWildcards(modMatch)) {
+            if (wildcardConfiguration.isWild(modMatch)) {
                clearCache();
                wildcardMatches.remove(modMatch);
             } else {
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java
index 2a2d18d472..25bbf365df 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java
@@ -16,12 +16,14 @@
  */
 package org.apache.activemq.artemis.core.config;
 
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.jupiter.api.Test;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
-
-import org.apache.activemq.artemis.utils.RandomUtil;
-import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class WildcardConfigurationTest {
 
@@ -124,4 +126,28 @@ public class WildcardConfigurationTest {
       assertNotEquals(b, a);
       assertNotEquals(a.hashCode(), b.hashCode());
    }
+
+   @Test
+   public void testIsWild() {
+      assertFalse(DEFAULT_WILDCARD.isWild(null));
+      assertFalse(DEFAULT_WILDCARD.isWild(""));
+
+      assertFalse(DEFAULT_WILDCARD.isWild("a"));
+      assertFalse(DEFAULT_WILDCARD.isWild("a.b"));
+      assertFalse(DEFAULT_WILDCARD.isWild("a\\.\\#"));
+      assertFalse(DEFAULT_WILDCARD.isWild("a\\.\\*"));
+      assertFalse(DEFAULT_WILDCARD.isWild("\\*"));
+      assertFalse(DEFAULT_WILDCARD.isWild("\\#"));
+
+      assertTrue(DEFAULT_WILDCARD.isWild("*"));
+      assertTrue(DEFAULT_WILDCARD.isWild("#"));
+      assertTrue(DEFAULT_WILDCARD.isWild("a.*"));
+      assertTrue(DEFAULT_WILDCARD.isWild("*.b"));
+      assertTrue(DEFAULT_WILDCARD.isWild("a.*.c"));
+      assertTrue(DEFAULT_WILDCARD.isWild("a.#"));
+      assertTrue(DEFAULT_WILDCARD.isWild("a.b.#"));
+      assertTrue(DEFAULT_WILDCARD.isWild("a.*.#"));
+      assertTrue(DEFAULT_WILDCARD.isWild("a.*.\\#"));
+      assertTrue(DEFAULT_WILDCARD.isWild("a.\\*.#"));
+   }
 }
\ No newline at end of file
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
index 15e98cf41d..814fce61cc 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSMessageConsumerTest.java
@@ -34,6 +34,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.AddressControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeTestAccessor;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.utils.DestinationUtil;
 import org.apache.activemq.artemis.utils.RandomUtil;
@@ -113,6 +115,82 @@ public class JMSMessageConsumerTest extends 
MultiprotocolJMSClientTestSupport {
       }
    }
 
+   @Test
+   @Timeout(30)
+   public void testAnycastPlusWildcardCore() throws Exception {
+      testAnycastPlusWildcard(CoreConnection);
+   }
+
+   @Test
+   @Timeout(30)
+   public void testAnycastPlusWildcardAMQP() throws Exception {
+      testAnycastPlusWildcard(AMQPConnection);
+   }
+
+   @Test
+   @Timeout(30)
+   public void testAnycastPlusWildcardOpenWire() throws Exception {
+      testAnycastPlusWildcard(OpenWireConnection);
+   }
+
+   /*
+    * This test ensures that even if a binding for a wildcard queue already 
exists on an anycast address the broker will
+    * still create a normal queue for a consumer.
+    */
+   private void testAnycastPlusWildcard(ConnectionSupplier supplier) throws 
Exception {
+      final String wildcardQueueName = "a.*.c";
+      final String normalQueueName = "a.b.c";
+
+      Connection wildcardConnection = supplier.createConnection();
+      Connection normalConnection = supplier.createConnection();
+      Connection producerConnection = supplier.createConnection();
+
+      try {
+         Session wildcardSession = wildcardConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         Session normalSession = normalConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         javax.jms.Queue wildcardQueue = 
wildcardSession.createQueue(wildcardQueueName);
+         javax.jms.Queue normalQueue = 
normalSession.createQueue(normalQueueName);
+
+         MessageConsumer wildcardConsumer = 
wildcardSession.createConsumer(wildcardQueue);
+         assertNotNull(server.locateQueue(wildcardQueueName));
+
+         MessageConsumer normalConsumer = 
normalSession.createConsumer(normalQueue);
+         assertNotNull(server.locateQueue(normalQueueName));
+
+         normalConsumer.close();
+
+         Wait.assertEquals(0L, () -> 
server.locateQueue(normalQueueName).getConsumerCount(), 3000, 20);
+
+         PostOfficeTestAccessor.sweepAndReapAddresses((PostOfficeImpl) 
server.getPostOffice());
+
+         Wait.assertTrue(() -> server.locateQueue(normalQueueName) == null, 
3000, 20);
+         assertNotNull(server.locateQueue(wildcardQueueName));
+
+         normalConsumer = normalSession.createConsumer(normalQueue);
+
+         assertNotNull(server.locateQueue(normalQueueName));
+
+         assertEquals(0L, 
server.locateQueue(wildcardQueueName).getMessageCount());
+         assertEquals(0L, 
server.locateQueue(normalQueueName).getMessageCount());
+
+         Session producerSession = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
producerSession.createProducer(normalQueue);
+         producer.send(producerSession.createMessage());
+         producer.close();
+
+         Wait.assertEquals(1L, () -> 
server.locateQueue(wildcardQueueName).getMessageCount(), 1000, 20);
+         Wait.assertEquals(1L, () -> 
server.locateQueue(normalQueueName).getMessageCount(), 1000, 20);
+
+         assertNotNull(wildcardConsumer.receive(200));
+         assertNotNull(normalConsumer.receive(200));
+      } finally {
+         wildcardConnection.close();
+         normalConnection.close();
+         producerConnection.close();
+      }
+   }
+
    @Test
    @Timeout(30)
    public void testQueueRoutingTypeMismatchCore() throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to