gemmellr commented on code in PR #5161: URL: https://github.com/apache/activemq-artemis/pull/5161#discussion_r1725351502
########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java: ########## @@ -3882,6 +3883,300 @@ public void testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection( } } + @Test + @Timeout(20) + public void testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws Exception { + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST)); + + final Map<String, Object> remoteSourceProperties = new HashMap<>(); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L); + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, "test"); + peer.connect("localhost", AMQP_PORT); + + // Precondition is that there were no bindings before the federation receiver attaches. + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectAttach().ofSender().withName("federation-address-receiver") + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withTarget().also() + .withSource().withAddress("test"); + + // Connect to remote as if some demand had matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName("federation-address-receiver") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties) + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress("test") + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + peer.remoteFlow().withLinkCredit(10).now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectTransfer().accept(); + + // Federation consumer should be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + + // Federate a message to check link is attached properly + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createTopic("test")); + + producer.send(session.createMessage()); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach(); + peer.remoteDetach().now(); // simulate demand removed so consumer is closed. + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Federation consumer should no longer be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + server.stop(); + } + } + + @Test + @Timeout(20) + public void testRemoteConnectionSuddenCloseLeaveAddressBindingIntact() throws Exception { + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST)); + + final Map<String, Object> remoteSourceProperties = new HashMap<>(); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false); + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, "test"); + peer.connect("localhost", AMQP_PORT); + + // Precondition is that there were no bindings before the federation receiver attaches. + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectAttach().ofSender().withName("federation-address-receiver") + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withTarget().also() + .withSource().withAddress("test"); + + // Connect to remote as if some demand had matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName("federation-address-receiver") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties) + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress("test") + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + peer.remoteFlow().withLinkCredit(10).now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectTransfer().accept(); + + // Federation consumer should be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + + // Federate a message to check link is attached properly + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createTopic("test")); + + producer.send(session.createMessage()); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + // Unexpected connection drop should leave durable federation address subscription in place. + Wait.assertTrue(() -> server.getConnectionCount() == 0); + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + } + + // Send a message to check that the federation binding holds onto sends while the remote is offline + // due to connectivity issues. + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createTopic("test")); + + producer.send(session.createMessage()); Review Comment: With the same no-payload as all the other cases. Since they are all the same it would have no clue if it ever got the 'wrong [identical] message' for some reason (e.g bug in processing ack of closing consumer, which has happened...or unexpected cross test pollution, given they all use the same details, which has happened). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact