This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new dcc1afdbe8 ARTEMIS-5381 Use FQQN to remove federation address
subscription if set
dcc1afdbe8 is described below
commit dcc1afdbe810d615849d0dd9e9ef5430cb123bdf
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Apr 10 16:40:00 2025 -0400
ARTEMIS-5381 Use FQQN to remove federation address subscription if set
If the remote used an FQQN to define the address and stable subscription
for the
address consumer recover the subscription from the FQQN when the remote
closes
the consumer due to demand having been removed on the remote for that
address.
---
.../AMQPFederationAddressSenderController.java | 11 ++-
.../connect/AMQPFederationAddressPolicyTest.java | 82 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 1 deletion(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
index acc5006e84..cb62815242 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressSenderController.java
@@ -223,9 +223,18 @@ public final class AMQPFederationAddressSenderController
extends AMQPFederationS
try {
final Sender sender = senderContext.getSender();
final Source source = (Source) sender.getRemoteSource();
- final SimpleString queueName = SimpleString.of(sender.getName());
+ final SimpleString sourceAddress =
SimpleString.of(source.getAddress());
final RoutingType routingType = getRoutingType(source);
+ final SimpleString queueName;
+
+ // Either we have negotiated subscriptions using FQQN or we default
to older behavior based on link names
+ if (CompositeAddress.isFullyQualified(sourceAddress)) {
+ queueName = CompositeAddress.extractQueueName(sourceAddress);
+ } else {
+ queueName = SimpleString.of(sender.getName());
+ }
+
final QueueQueryResult queueQuery = sessionSPI.queueQuery(queueName,
routingType, false);
if (queueQuery.isExists()) {
sessionSPI.deleteQueue(queueName);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index 1764761dc9..6a4d7775c9 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -4471,6 +4471,88 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
producer.send(session.createMessage());
}
+ // Federation consumer should create a binding using the link name
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().contains(SimpleString.of("federation-address-receiver")));
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDetach();
+ peer.remoteDetach().now(); // simulate demand removed so consumer is
closed.
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // Federation consumer should no longer be bound to the server's
address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().isEmpty());
+
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ server.stop();
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBindingWhenUsingFQQN()
throws Exception {
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of("test"),
RoutingType.MULTICAST));
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test", true);
+ peer.connect("localhost", AMQP_PORT);
+
+ // Precondition is that there were no bindings before the federation
receiver attaches.
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().isEmpty());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withTarget().also()
+
.withSource().withAddress("test::test-address-binding");
+
+ // Connect to remote as if some demand had matched our federation
policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+
.withAddress("test::test-address-binding")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+ peer.remoteFlow().withLinkCredit(10).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectTransfer().accept();
+
+ // Federation consumer should be bound to the server's address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+ // Federate a message to check link is attached properly
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic("test"));
+
+ producer.send(session.createMessage());
+ }
+
+ // Federation consumer should use the queue part of the FQQN we set
on the source address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().contains(SimpleString.of("test-address-binding")));
+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectDetach();
peer.remoteDetach().now(); // simulate demand removed so consumer is
closed.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact