This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 54c5124b7b ARTEMIS-5941 Route wildcard subscriptions direct to demand
bindings
54c5124b7b is described below
commit 54c5124b7b96a36e1f83fb3e36e6bc1787ec448d
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Mar 6 16:07:26 2026 -0500
ARTEMIS-5941 Route wildcard subscriptions direct to demand bindings
Avoid using the WildcardAddressManager to route to an actual wild-card
address
when a federation link is federating for demand on the wild-card address as
that
method will throw if assertions are enabled and has likely not been tested
for
that use.
---
.../AMQPFederationAddressPolicyManager.java | 22 +-
.../AMQPFederationLocalPolicyManager.java | 21 ++
.../connect/AMQPFederationAddressPolicyTest.java | 234 +++++++++++++++++++++
3 files changed, 271 insertions(+), 6 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
index 6db4d5c2c2..6703715492 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
@@ -653,12 +653,22 @@ public final class AMQPFederationAddressPolicyManager
extends AMQPFederationLoca
}
private boolean isUseConduitConsumer() {
- // Only use binding filters when configured to do so and the remote
supports FQQN subscriptions because
- // we need to be able to open multiple uniquely named queues for an
address if more than one consumer with
- // differing filters are present and prior to FQQN subscription
support we used a simple link name that
- // would not be unique amongst multiple consumers.
- return configuration.isIgnoreAddressBindingFilters() ||
-
!manager.getFederation().getCapabilities().isUseFQQNAddressSubscriptions();
+ if (!manager.getCapabilities().isUseFQQNAddressSubscriptions()) {
+ // prior to FQQN subscription support we used a simple link name
that would not be unique amongst
+ // multiple consumers on the same address so for most features or
behaviors that come later we cannot
+ // action them properly and we must fallback to a conduit consumer
strategy.
+ return true;
+ } else if
(manager.getWildcardConfiguration().isWild(addressInfo.getName())) {
+ // For a wildcard subscription we want to treat it as a bindings
aware consumer and route messages only
+ // to the bindings we have tracked as demand on the address. The
broker wild-card code is not known to
+ // support direct sends to the literal wildcard address and could
fail to route messages correctly.
+ return false;
+ } else {
+ // If we get here then the only consideration is if we are
honoring consumers filters on the address
+ // which decides if we can route straight to the address or must
route to specific bindings groups that
+ // all share the same filter.
+ return configuration.isIgnoreAddressBindingFilters();
+ }
}
private String generateQueueName(AddressInfo address, Binding binding,
boolean ignoreFilters) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationLocalPolicyManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationLocalPolicyManager.java
index f848564c5d..ecf3b6c31d 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationLocalPolicyManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationLocalPolicyManager.java
@@ -19,6 +19,7 @@ package
org.apache.activemq.artemis.protocol.amqp.connect.federation;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
@@ -39,10 +40,15 @@ public abstract class AMQPFederationLocalPolicyManager
extends AMQPFederationPol
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final WildcardConfiguration wildcardConfiguration;
+
protected volatile AMQPFederationConsumerConfiguration configuration;
+ protected volatile AMQPFederationCapabilities capabilities;
public AMQPFederationLocalPolicyManager(AMQPFederation federation,
AMQPFederationMetrics metrics, FederationReceiveFromResourcePolicy policy)
throws ActiveMQException {
super(federation, metrics, policy.getPolicyName(),
policy.getPolicyType());
+
+ this.wildcardConfiguration = federation.getWildcardConfiguration();
}
/**
@@ -57,6 +63,20 @@ public abstract class AMQPFederationLocalPolicyManager
extends AMQPFederationPol
return configuration;
}
+ /**
+ * {@return the known connection capabilities at this time the method is
called}
+ */
+ protected AMQPFederationCapabilities getCapabilities() {
+ return capabilities;
+ }
+
+ /**
+ * {@return the wild-card configuration the federation was configured with
when created}
+ */
+ protected WildcardConfiguration getWildcardConfiguration() {
+ return wildcardConfiguration;
+ }
+
@Override
protected final void handleManagerInitialized() {
server.registerBrokerPlugin(this);
@@ -104,6 +124,7 @@ public abstract class AMQPFederationLocalPolicyManager
extends AMQPFederationPol
// Capture state for the current connection on each connection as
different URIs could have different options we
// need to capture in the current configuration state.
configuration = new
AMQPFederationConsumerConfiguration(federation.getConfiguration(),
getPolicy().getProperties());
+ capabilities = federation.getCapabilities();
updateStateAfterConnect(configuration, session);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index 167e0075cd..71b8d48117 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -7312,6 +7312,240 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void
testWildcardSubscriptionRoutedToBindingWhenFederatedFromRemote() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+ .respondInKind()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind();
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ final String wildcardAddressA = getTestName() + ".A.#";
+ final String wildcardAddressB = getTestName() + ".B.#";
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes(wildcardAddressA);
+ receiveFromAddress.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalAddressPolicy(receiveFromAddress);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(allOf(containsString(wildcardAddressA),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withSource().withAddress(not(containsString("filterId"))).also()
+ .respondInKind();
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ // Both should have the message routed to them by a single
federation receiver link to the remote
+ final MessageConsumer consumerA1 =
session.createConsumer(session.createTopic(wildcardAddressA));
+ final MessageConsumer consumerA2 =
session.createConsumer(session.createTopic(wildcardAddressA));
+ // Should not match and should not create any federation links
+ final MessageConsumer consumerB =
session.createConsumer(session.createTopic(wildcardAddressB));
+
+ connection.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withSettled(true).withState().accepted();
+ peer.remoteTransfer().withMessage()
+ .withBody().withString("test-message").also()
+ .withDeliveryId(0)
+ .later(10);
+
+ final Message messageA1 = consumerA1.receive(5_000);
+ assertNotNull(messageA1);
+ assertInstanceOf(TextMessage.class, messageA1);
+ assertEquals("test-message", ((TextMessage) messageA1).getText());
+
+ final Message messageA2 = consumerA2.receive(5_000);
+ assertNotNull(messageA2);
+ assertInstanceOf(TextMessage.class, messageA2);
+ assertEquals("test-message", ((TextMessage) messageA2).getText());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(999).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach(); // demand will be gone and receiver link
should close.
+
+ assertNull(consumerB.receiveNoWait());
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testWildcardSubscriptionWithFiltersRoutedToBindingsWhenFederatedFromRemote()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2)
+ .respondInKind()
+ .withProperty(FEDERATION_VERSION.toString(),
FEDERATION_V2);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind();
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ final String wildcardAddressA = getTestName() + ".A.#";
+ final String wildcardAddressB = getTestName() + ".B.#";
+ final String expectedJMSFilter = "color='red'";
+ final AtomicReference<Attach> capturedAttach = new
AtomicReference<>();
+ final Symbol jmsSelectorKey = Symbol.valueOf("jms-selector");
+ final
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong
jmsSelectorCode =
+
org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedLong.valueOf(0x0000468C00000004L);
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes(wildcardAddressA);
+ receiveFromAddress.addProperty(ADDRESS_RECEIVER_IDLE_TIMEOUT, 0);
+ receiveFromAddress.addProperty(IGNORE_ADDRESS_BINDING_FILTERS,
"false");
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalAddressPolicy(receiveFromAddress);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withCapture(attach -> capturedAttach.set(attach))
+ .withName(allOf(containsString(wildcardAddressA),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withSource().withAddress(containsString("filterId")).also()
+ .respondInKind();
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ // Both should have the message routed to them by a single
federation receiver link to the remote
+ // that link should have a filter assigned to it so the remote
only sends matches and nothing else.
+ final MessageConsumer consumerA1 =
session.createConsumer(session.createTopic(wildcardAddressA), "color='red'");
+ final MessageConsumer consumerA2 =
session.createConsumer(session.createTopic(wildcardAddressA), "color='red'");
+ // Should not match and should not create any federation links
+ final MessageConsumer consumerB =
session.createConsumer(session.createTopic(wildcardAddressB));
+
+ connection.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withSettled(true).withState().accepted();
+ peer.remoteTransfer().withMessage()
+ .withBody().withString("test-message").also()
+
.withApplicationProperties().withProperty("color", "red").also()
+ .withDeliveryId(0)
+ .later(10);
+
+ final Message messageA1 = consumerA1.receive(5_000);
+ assertNotNull(messageA1);
+ assertInstanceOf(TextMessage.class, messageA1);
+ assertEquals("test-message", ((TextMessage) messageA1).getText());
+
+ final Message messageA2 = consumerA2.receive(5_000);
+ assertNotNull(messageA2);
+ assertInstanceOf(TextMessage.class, messageA2);
+ assertEquals("test-message", ((TextMessage) messageA2).getText());
+
+ final Map<Symbol, Object> filtersMap1 =
capturedAttach.get().getSource().getFilter();
+
+ assertNotNull(filtersMap1);
+ assertTrue(filtersMap1.containsKey(jmsSelectorKey));
+ final DescribedType jmsSelectorEntry = (DescribedType)
filtersMap1.get(jmsSelectorKey);
+ assertNotNull(jmsSelectorEntry);
+ assertEquals(jmsSelectorEntry.getDescriptor(), jmsSelectorCode);
+ assertEquals(jmsSelectorEntry.getDescribed().toString(),
expectedJMSFilter);
+
+ // Attach another consumer on the same wild card address but with
a different filter and we should
+ // see a new receiver link opened as that binding needs its own
receiver.
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withCapture(attach ->
capturedAttach.set(attach))
+
.withName(allOf(containsString(wildcardAddressA),
+
containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withSource().withAddress(containsString("filterId")).also()
+ .respondInKind();
+ peer.expectFlow().withLinkCredit(1000);
+
+ final MessageConsumer consumerA3 =
session.createConsumer(session.createTopic(wildcardAddressA), "color='blue'");
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ assertNull(consumerA3.receiveNoWait()); // Should be nothing to
route here.
+
+ // Consumer A3 close should trigger federation link close with no
messages read.
+ peer.expectFlow().withLinkCredit(1000).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach();
+
+ consumerA3.close();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // Now the other two consumers will close and the link should be
torn down.
+ peer.expectFlow().withLinkCredit(999).withDrain(true)
+ .respond()
+
.withLinkCredit(0).withDeliveryCount(1000).withDrain(true);
+ peer.expectDetach();
+
+ assertNull(consumerB.receive(100));
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+
protected void configureSecurity(ActiveMQServer server, String allowed,
String restricted, String... userAllowedOnly) {
ActiveMQJAASSecurityManager securityManager =
(ActiveMQJAASSecurityManager) server.getSecurityManager();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]