This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 54c5124b7b ARTEMIS-5941 Route wildcard subscriptions direct to demand 
bindings
54c5124b7b is described below

commit 54c5124b7b96a36e1f83fb3e36e6bc1787ec448d
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Mar 6 16:07:26 2026 -0500

    ARTEMIS-5941 Route wildcard subscriptions direct to demand bindings
    
    Avoid using the WildcardAddressManager to route to an actual wild-card 
address
    when a federation link is federating for demand on the wild-card address as 
that
    method will throw if assertions are enabled and has likely not been tested 
for
    that use.
---
 .../AMQPFederationAddressPolicyManager.java        |  22 +-
 .../AMQPFederationLocalPolicyManager.java          |  21 ++
 .../connect/AMQPFederationAddressPolicyTest.java   | 234 +++++++++++++++++++++
 3 files changed, 271 insertions(+), 6 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
index 6db4d5c2c2..6703715492 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
@@ -653,12 +653,22 @@ public final class AMQPFederationAddressPolicyManager 
extends AMQPFederationLoca
       }
 
       private boolean isUseConduitConsumer() {
-         // Only use binding filters when configured to do so and the remote 
supports FQQN subscriptions because
-         // we need to be able to open multiple uniquely named queues for an 
address if more than one consumer with
-         // differing filters are present and prior to FQQN subscription 
support we used a simple link name that
-         // would not be unique amongst multiple consumers.
-         return configuration.isIgnoreAddressBindingFilters() ||
-                
!manager.getFederation().getCapabilities().isUseFQQNAddressSubscriptions();
+         if (!manager.getCapabilities().isUseFQQNAddressSubscriptions()) {
+            // prior to FQQN subscription support we used a simple link name 
that would not be unique amongst
+            // multiple consumers on the same address so for most features or 
behaviors that come later we cannot
+            // action them properly and we must fallback to a conduit consumer 
strategy.
+            return true;
+         } else if 
(manager.getWildcardConfiguration().isWild(addressInfo.getName())) {
+            // For a wildcard subscription we want to treat it as a bindings 
aware consumer and route messages only
+            // to the bindings we have tracked as demand on the address. The 
broker wild-card code is not known to
+            // support direct sends to the literal wildcard address and could 
fail to route messages correctly.
+            return false;
+         } else {
+            // If we get here then the only consideration is if we are 
honoring consumers filters on the address
+            // which decides if we can route straight to the address or must 
route to specific bindings groups that
+            // all share the same filter.
+            return configuration.isIgnoreAddressBindingFilters();
+         }
       }
 
       private String generateQueueName(AddressInfo address, Binding binding, 
boolean ignoreFilters) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationLocalPolicyManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationLocalPolicyManager.java
index f848564c5d..ecf3b6c31d 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationLocalPolicyManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationLocalPolicyManager.java
@@ -19,6 +19,7 @@ package 
org.apache.activemq.artemis.protocol.amqp.connect.federation;
 import java.lang.invoke.MethodHandles;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -39,10 +40,15 @@ public abstract class AMQPFederationLocalPolicyManager 
extends AMQPFederationPol
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+   private final WildcardConfiguration wildcardConfiguration;
+
    protected volatile AMQPFederationConsumerConfiguration configuration;
+   protected volatile AMQPFederationCapabilities capabilities;
 
    public AMQPFederationLocalPolicyManager(AMQPFederation federation, 
AMQPFederationMetrics metrics, FederationReceiveFromResourcePolicy policy) 
throws ActiveMQException {
       super(federation, metrics, policy.getPolicyName(), 
policy.getPolicyType());
+
+      this.wildcardConfiguration = federation.getWildcardConfiguration();
    }
 
    /**
@@ -57,6 +63,20 @@ public abstract class AMQPFederationLocalPolicyManager 
extends AMQPFederationPol
       return configuration;
    }
 
+   /**
+    * {@return the known connection capabilities at this time the method is 
called}
+    */
+   protected AMQPFederationCapabilities getCapabilities() {
+      return capabilities;
+   }
+
+   /**
+    * {@return the wild-card configuration the federation was configured with 
when created}
+    */
+   protected WildcardConfiguration getWildcardConfiguration() {
+      return wildcardConfiguration;
+   }
+
    @Override
    protected final void handleManagerInitialized() {
       server.registerBrokerPlugin(this);
@@ -104,6 +124,7 @@ public abstract class AMQPFederationLocalPolicyManager 
extends AMQPFederationPol
       // Capture state for the current connection on each connection as 
different URIs could have different options we
       // need to capture in the current configuration state.
       configuration = new 
AMQPFederationConsumerConfiguration(federation.getConfiguration(), 
getPolicy().getProperties());
+      capabilities = federation.getCapabilities();
 
       updateStateAfterConnect(configuration, session);
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index 167e0075cd..71b8d48117 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -7312,6 +7312,240 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testWildcardSubscriptionRoutedToBindingWhenFederatedFromRemote() throws 
Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofSender()
+                            
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V2)
+                            .respondInKind()
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V2);
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+                            .respondInKind();
+         peer.expectFlow().withLinkCredit(10);
+         peer.start();
+
+         final String wildcardAddressA = getTestName() + ".A.#";
+         final String wildcardAddressB = getTestName() + ".B.#";
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPFederationAddressPolicyElement receiveFromAddress = new 
AMQPFederationAddressPolicyElement();
+         receiveFromAddress.setName("address-policy");
+         receiveFromAddress.addToIncludes(wildcardAddressA);
+         receiveFromAddress.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+         final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addLocalAddressPolicy(receiveFromAddress);
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName(allOf(containsString(wildcardAddressA),
+                                            containsString("address-receiver"),
+                                            
containsString(server.getNodeID().toString())))
+                            
.withSource().withAddress(not(containsString("filterId"))).also()
+                            .respondInKind();
+         peer.expectFlow().withLinkCredit(1000);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            // Both should have the message routed to them by a single 
federation receiver link to the remote
+            final MessageConsumer consumerA1 = 
session.createConsumer(session.createTopic(wildcardAddressA));
+            final MessageConsumer consumerA2 = 
session.createConsumer(session.createTopic(wildcardAddressA));
+            // Should not match and should not create any federation links
+            final MessageConsumer consumerB = 
session.createConsumer(session.createTopic(wildcardAddressB));
+
+            connection.start();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectDisposition().withSettled(true).withState().accepted();
+            peer.remoteTransfer().withMessage()
+                                 .withBody().withString("test-message").also()
+                                 .withDeliveryId(0)
+                                 .later(10);
+
+            final Message messageA1 = consumerA1.receive(5_000);
+            assertNotNull(messageA1);
+            assertInstanceOf(TextMessage.class, messageA1);
+            assertEquals("test-message", ((TextMessage) messageA1).getText());
+
+            final Message messageA2 = consumerA2.receive(5_000);
+            assertNotNull(messageA2);
+            assertInstanceOf(TextMessage.class, messageA2);
+            assertEquals("test-message", ((TextMessage) messageA2).getText());
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectFlow().withLinkCredit(999).withDrain(true)
+                             .respond()
+                             
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+            peer.expectDetach(); // demand will be gone and receiver link 
should close.
+
+            assertNull(consumerB.receiveNoWait());
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testWildcardSubscriptionWithFiltersRoutedToBindingsWhenFederatedFromRemote() 
throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofSender()
+                            
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V2)
+                            .respondInKind()
+                            .withProperty(FEDERATION_VERSION.toString(), 
FEDERATION_V2);
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+                            .respondInKind();
+         peer.expectFlow().withLinkCredit(10);
+         peer.start();
+
+         final String wildcardAddressA = getTestName() + ".A.#";
+         final String wildcardAddressB = getTestName() + ".B.#";
+         final String expectedJMSFilter = "color='red'";
+         final AtomicReference<Attach> capturedAttach = new 
AtomicReference<>();
+         final Symbol jmsSelectorKey = Symbol.valueOf("jms-selector");
+         final 
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong 
jmsSelectorCode =
+            
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000004L);
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Test started, peer listening on: {}", remoteURI);
+
+         final AMQPFederationAddressPolicyElement receiveFromAddress = new 
AMQPFederationAddressPolicyElement();
+         receiveFromAddress.setName("address-policy");
+         receiveFromAddress.addToIncludes(wildcardAddressA);
+         receiveFromAddress.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+         receiveFromAddress.addProperty(IGNORE_ADDRESS_BINDING_FILTERS, 
"false");
+
+         final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addLocalAddressPolicy(receiveFromAddress);
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withCapture(attach -> capturedAttach.set(attach))
+                            .withName(allOf(containsString(wildcardAddressA),
+                                            containsString("address-receiver"),
+                                            
containsString(server.getNodeID().toString())))
+                            
.withSource().withAddress(containsString("filterId")).also()
+                            .respondInKind();
+         peer.expectFlow().withLinkCredit(1000);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            // Both should have the message routed to them by a single 
federation receiver link to the remote
+            // that link should have a filter assigned to it so the remote 
only sends matches and nothing else.
+            final MessageConsumer consumerA1 = 
session.createConsumer(session.createTopic(wildcardAddressA), "color='red'");
+            final MessageConsumer consumerA2 = 
session.createConsumer(session.createTopic(wildcardAddressA), "color='red'");
+            // Should not match and should not create any federation links
+            final MessageConsumer consumerB = 
session.createConsumer(session.createTopic(wildcardAddressB));
+
+            connection.start();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectDisposition().withSettled(true).withState().accepted();
+            peer.remoteTransfer().withMessage()
+                                 .withBody().withString("test-message").also()
+                                 
.withApplicationProperties().withProperty("color", "red").also()
+                                 .withDeliveryId(0)
+                                 .later(10);
+
+            final Message messageA1 = consumerA1.receive(5_000);
+            assertNotNull(messageA1);
+            assertInstanceOf(TextMessage.class, messageA1);
+            assertEquals("test-message", ((TextMessage) messageA1).getText());
+
+            final Message messageA2 = consumerA2.receive(5_000);
+            assertNotNull(messageA2);
+            assertInstanceOf(TextMessage.class, messageA2);
+            assertEquals("test-message", ((TextMessage) messageA2).getText());
+
+            final Map<Symbol, Object> filtersMap1 = 
capturedAttach.get().getSource().getFilter();
+
+            assertNotNull(filtersMap1);
+            assertTrue(filtersMap1.containsKey(jmsSelectorKey));
+            final DescribedType jmsSelectorEntry = (DescribedType) 
filtersMap1.get(jmsSelectorKey);
+            assertNotNull(jmsSelectorEntry);
+            assertEquals(jmsSelectorEntry.getDescriptor(), jmsSelectorCode);
+            assertEquals(jmsSelectorEntry.getDescribed().toString(), 
expectedJMSFilter);
+
+            // Attach another consumer on the same wild card address but with 
a different filter and we should
+            // see a new receiver link opened as that binding needs its own 
receiver.
+            peer.expectAttach().ofReceiver()
+                               
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+                               .withCapture(attach -> 
capturedAttach.set(attach))
+                               
.withName(allOf(containsString(wildcardAddressA),
+                                               
containsString("address-receiver"),
+                                               
containsString(server.getNodeID().toString())))
+                               
.withSource().withAddress(containsString("filterId")).also()
+                               .respondInKind();
+            peer.expectFlow().withLinkCredit(1000);
+
+            final MessageConsumer consumerA3 = 
session.createConsumer(session.createTopic(wildcardAddressA), "color='blue'");
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            assertNull(consumerA3.receiveNoWait());  // Should be nothing to 
route here.
+
+            // Consumer A3 close should trigger federation link close with no 
messages read.
+            peer.expectFlow().withLinkCredit(1000).withDrain(true)
+                             .respond()
+                             
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+            peer.expectDetach();
+
+            consumerA3.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            // Now the other two consumers will close and the link should be 
torn down.
+            peer.expectFlow().withLinkCredit(999).withDrain(true)
+                             .respond()
+                             
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+            peer.expectDetach();
+
+            assertNull(consumerB.receive(100));
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+      }
+   }
+
    protected void configureSecurity(ActiveMQServer server, String allowed, 
String restricted, String... userAllowedOnly) {
       ActiveMQJAASSecurityManager securityManager = 
(ActiveMQJAASSecurityManager) server.getSecurityManager();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to