tabish121 commented on code in PR #5161:
URL: https://github.com/apache/activemq-artemis/pull/5161#discussion_r1725280820


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void 
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws 
Exception {
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test");
+         peer.connect("localhost", AMQP_PORT);
+
+         // Precondition is that there were no bindings before the federation 
receiver attaches.
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       .withTarget().also()
+                                       .withSource().withAddress("test");
+
+         // Connect to remote as if some demand had matched our federation 
policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress("test")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectTransfer().accept();
+
+         // Federation consumer should be bound to the server's address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+         // Federate a message to check link is attached properly
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+            producer.send(session.createMessage());
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDetach();
+         peer.remoteDetach().now();  // simulate demand removed so consumer is 
closed.
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Federation consumer should no longer be bound to the server's 
address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         server.stop();
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void testRemoteConnectionSuddenCloseLeaveAddressBindingIntact() 
throws Exception {
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test");
+         peer.connect("localhost", AMQP_PORT);
+
+         // Precondition is that there were no bindings before the federation 
receiver attaches.
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       .withTarget().also()
+                                       .withSource().withAddress("test");
+
+         // Connect to remote as if some demand had matched our federation 
policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress("test")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectTransfer().accept();
+
+         // Federation consumer should be bound to the server's address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+         // Federate a message to check link is attached properly
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+            producer.send(session.createMessage());
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         // Unexpected connection drop should leave durable federation address 
subscription in place.
+         Wait.assertTrue(() -> server.getConnectionCount() == 0);
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+      }
+
+      // Send a message to check that the federation binding holds onto sends 
while the remote is offline
+      // due to connectivity issues.
+      final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT);
+
+      try (Connection connection = factory.createConnection()) {
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+         producer.send(session.createMessage());

Review Comment:
   This is already tested by the wait for transfer and accept of the previously 
sent message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to