gemmellr commented on code in PR #5161:
URL: https://github.com/apache/activemq-artemis/pull/5161#discussion_r1725351502
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
}
}
+ @Test
+ @Timeout(20)
+ public void
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws
Exception {
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of("test"),
RoutingType.MULTICAST));
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test");
+ peer.connect("localhost", AMQP_PORT);
+
+ // Precondition is that there were no bindings before the federation
receiver attaches.
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withTarget().also()
+ .withSource().withAddress("test");
+
+ // Connect to remote as if some demand had matched our federation
policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress("test")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+ peer.remoteFlow().withLinkCredit(10).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectTransfer().accept();
+
+ // Federation consumer should be bound to the server's address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+ // Federate a message to check link is attached properly
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic("test"));
+
+ producer.send(session.createMessage());
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDetach();
+ peer.remoteDetach().now(); // simulate demand removed so consumer is
closed.
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // Federation consumer should no longer be bound to the server's
address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ server.stop();
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testRemoteConnectionSuddenCloseLeaveAddressBindingIntact()
throws Exception {
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of("test"),
RoutingType.MULTICAST));
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test");
+ peer.connect("localhost", AMQP_PORT);
+
+ // Precondition is that there were no bindings before the federation
receiver attaches.
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withTarget().also()
+ .withSource().withAddress("test");
+
+ // Connect to remote as if some demand had matched our federation
policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress("test")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+ peer.remoteFlow().withLinkCredit(10).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectTransfer().accept();
+
+ // Federation consumer should be bound to the server's address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+ // Federate a message to check link is attached properly
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic("test"));
+
+ producer.send(session.createMessage());
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ // Unexpected connection drop should leave durable federation address
subscription in place.
+ Wait.assertTrue(() -> server.getConnectionCount() == 0);
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+ }
+
+ // Send a message to check that the federation binding holds onto sends
while the remote is offline
+ // due to connectivity issues.
+ final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic("test"));
+
+ producer.send(session.createMessage());
Review Comment:
With the same no-payload as all the other cases. Since they are all the same
it would have no clue if it ever got the 'wrong [identical] message' for some
reason (e.g bug in processing ack of closing consumer, which has happened...or
unexpected cross test pollution, given they all use the same details, which has
happened).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact