tabish121 commented on code in PR #5161: URL: https://github.com/apache/activemq-artemis/pull/5161#discussion_r1725280820
########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java: ########## @@ -3882,6 +3883,300 @@ public void testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection( } } + @Test + @Timeout(20) + public void testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws Exception { + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST)); + + final Map<String, Object> remoteSourceProperties = new HashMap<>(); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L); + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, "test"); + peer.connect("localhost", AMQP_PORT); + + // Precondition is that there were no bindings before the federation receiver attaches. + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectAttach().ofSender().withName("federation-address-receiver") + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withTarget().also() + .withSource().withAddress("test"); + + // Connect to remote as if some demand had matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName("federation-address-receiver") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties) + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress("test") + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + peer.remoteFlow().withLinkCredit(10).now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectTransfer().accept(); + + // Federation consumer should be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + + // Federate a message to check link is attached properly + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createTopic("test")); + + producer.send(session.createMessage()); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach(); + peer.remoteDetach().now(); // simulate demand removed so consumer is closed. + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // Federation consumer should no longer be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + server.stop(); + } + } + + @Test + @Timeout(20) + public void testRemoteConnectionSuddenCloseLeaveAddressBindingIntact() throws Exception { + server.start(); + server.addAddressInfo(new AddressInfo(SimpleString.of("test"), RoutingType.MULTICAST)); + + final Map<String, Object> remoteSourceProperties = new HashMap<>(); + remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false); + + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, "test"); + peer.connect("localhost", AMQP_PORT); + + // Precondition is that there were no bindings before the federation receiver attaches. + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + peer.expectAttach().ofSender().withName("federation-address-receiver") + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withTarget().also() + .withSource().withAddress("test"); + + // Connect to remote as if some demand had matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName("federation-address-receiver") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), remoteSourceProperties) + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress("test") + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + peer.remoteFlow().withLinkCredit(10).now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectTransfer().accept(); + + // Federation consumer should be bound to the server's address + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + + // Federate a message to check link is attached properly + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createTopic("test")); + + producer.send(session.createMessage()); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + // Unexpected connection drop should leave durable federation address subscription in place. + Wait.assertTrue(() -> server.getConnectionCount() == 0); + Wait.assertTrue(() -> server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1); + } + + // Send a message to check that the federation binding holds onto sends while the remote is offline + // due to connectivity issues. + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); + + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createTopic("test")); + + producer.send(session.createMessage()); Review Comment: This is already tested by the wait for transfer and accept of the previously sent message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact