gemmellr commented on code in PR #5161:
URL: https://github.com/apache/activemq-artemis/pull/5161#discussion_r1725351502


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void 
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws 
Exception {
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test");
+         peer.connect("localhost", AMQP_PORT);
+
+         // Precondition is that there were no bindings before the federation 
receiver attaches.
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       .withTarget().also()
+                                       .withSource().withAddress("test");
+
+         // Connect to remote as if some demand had matched our federation 
policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress("test")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectTransfer().accept();
+
+         // Federation consumer should be bound to the server's address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+         // Federate a message to check link is attached properly
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+            producer.send(session.createMessage());
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDetach();
+         peer.remoteDetach().now();  // simulate demand removed so consumer is 
closed.
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Federation consumer should no longer be bound to the server's 
address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         server.stop();
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void testRemoteConnectionSuddenCloseLeaveAddressBindingIntact() 
throws Exception {
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test");
+         peer.connect("localhost", AMQP_PORT);
+
+         // Precondition is that there were no bindings before the federation 
receiver attaches.
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       .withTarget().also()
+                                       .withSource().withAddress("test");
+
+         // Connect to remote as if some demand had matched our federation 
policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress("test")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectTransfer().accept();
+
+         // Federation consumer should be bound to the server's address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+         // Federate a message to check link is attached properly
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+            producer.send(session.createMessage());
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         // Unexpected connection drop should leave durable federation address 
subscription in place.
+         Wait.assertTrue(() -> server.getConnectionCount() == 0);
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+      }
+
+      // Send a message to check that the federation binding holds onto sends 
while the remote is offline
+      // due to connectivity issues.
+      final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT);
+
+      try (Connection connection = factory.createConnection()) {
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+         producer.send(session.createMessage());

Review Comment:
   With the same no-payload as all the other cases. Since they are all the same 
it would have no clue if it ever got the 'wrong [identical] message' for some 
reason (e.g bug in processing ack of closing consumer, which has happened...or 
unexpected cross test pollution, given they all use the same details, which has 
happened).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to