gemmellr commented on code in PR #5161:
URL: https://github.com/apache/activemq-artemis/pull/5161#discussion_r1725165739


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void 
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws 
Exception {
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);

Review Comment:
   If the test is checking the binding is removed proactively...should it 
actually be setting it to auto-delete at all?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void 
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws 
Exception {
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));

Review Comment:
   Rather than literal "test" could it use e.g getTestName()? Perhaps assigned 
to a variable to make following the address usage simpler.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void 
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws 
Exception {
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test");
+         peer.connect("localhost", AMQP_PORT);
+
+         // Precondition is that there were no bindings before the federation 
receiver attaches.
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       .withTarget().also()
+                                       .withSource().withAddress("test");
+
+         // Connect to remote as if some demand had matched our federation 
policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress("test")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectTransfer().accept();
+
+         // Federation consumer should be bound to the server's address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+         // Federate a message to check link is attached properly
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+            producer.send(session.createMessage());
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDetach();
+         peer.remoteDetach().now();  // simulate demand removed so consumer is 
closed.
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Federation consumer should no longer be bound to the server's 
address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         server.stop();
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void testRemoteConnectionSuddenCloseLeaveAddressBindingIntact() 
throws Exception {
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test");
+         peer.connect("localhost", AMQP_PORT);
+
+         // Precondition is that there were no bindings before the federation 
receiver attaches.
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       .withTarget().also()
+                                       .withSource().withAddress("test");
+
+         // Connect to remote as if some demand had matched our federation 
policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress("test")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectTransfer().accept();
+
+         // Federation consumer should be bound to the server's address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+         // Federate a message to check link is attached properly
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+            producer.send(session.createMessage());
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         // Unexpected connection drop should leave durable federation address 
subscription in place.
+         Wait.assertTrue(() -> server.getConnectionCount() == 0);
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+      }
+
+      // Send a message to check that the federation binding holds onto sends 
while the remote is offline
+      // due to connectivity issues.
+      final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT);
+
+      try (Connection connection = factory.createConnection()) {
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+         producer.send(session.createMessage());

Review Comment:
   Would be good to make the payload specific/different than before, e.g 
"offline", so it can be asserted.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void 
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws 
Exception {
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test");
+         peer.connect("localhost", AMQP_PORT);
+
+         // Precondition is that there were no bindings before the federation 
receiver attaches.
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       .withTarget().also()
+                                       .withSource().withAddress("test");
+
+         // Connect to remote as if some demand had matched our federation 
policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress("test")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectTransfer().accept();
+
+         // Federation consumer should be bound to the server's address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+         // Federate a message to check link is attached properly
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+            producer.send(session.createMessage());
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDetach();
+         peer.remoteDetach().now();  // simulate demand removed so consumer is 
closed.
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Federation consumer should no longer be bound to the server's 
address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         server.stop();
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void testRemoteConnectionSuddenCloseLeaveAddressBindingIntact() 
throws Exception {
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test");
+         peer.connect("localhost", AMQP_PORT);
+
+         // Precondition is that there were no bindings before the federation 
receiver attaches.
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       .withTarget().also()
+                                       .withSource().withAddress("test");
+
+         // Connect to remote as if some demand had matched our federation 
policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress("test")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectTransfer().accept();
+
+         // Federation consumer should be bound to the server's address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+         // Federate a message to check link is attached properly
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+            producer.send(session.createMessage());
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         // Unexpected connection drop should leave durable federation address 
subscription in place.
+         Wait.assertTrue(() -> server.getConnectionCount() == 0);
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+      }
+
+      // Send a message to check that the federation binding holds onto sends 
while the remote is offline
+      // due to connectivity issues.
+      final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT);
+
+      try (Connection connection = factory.createConnection()) {
+         final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+         final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+         producer.send(session.createMessage());
+      }
+
+      // Reconnect again as if the remote has recovered from the unexpected 
connection drop
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test");
+         peer.connect("localhost", AMQP_PORT);
+
+         // Precondition is that there was still a binding from the previous 
federation whose connection dropped
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       .withTarget().also()
+                                       .withSource().withAddress("test");
+         peer.expectTransfer().accept();
+
+         // Connect again to remote as if local demand still matches our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress("test")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDetach();
+         peer.remoteDetach().now();  // simulate demand removed so consumer is 
closed.
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Federation consumer should no longer be bound to the server's 
address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         server.stop();
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testFederationAddressBindingCleanedUpAfterConnectionDroppedIfConfiguredTo() 
throws Exception {
+      
doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(true);
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testFederationAddressBindingNotCleanedUpAfterConnectionDroppedIfConfiguredNotTo()
 throws Exception {
+      
doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(false);
+   }
+
+   private void 
doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(boolean 
autoDelete) throws Exception {
+      server.getConfiguration().setAddressQueueScanPeriod(100);
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      if (autoDelete) {
+         remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+         remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 200L);
+         remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+      } else {
+         remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+         remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, -1L);
+         remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+      }
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test");
+         peer.connect("localhost", AMQP_PORT);
+
+         // Precondition is that there were no bindings before the federation 
receiver attaches.
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       .withTarget().also()
+                                       .withSource().withAddress("test");
+
+         // Connect to remote as if some demand had matched our federation 
policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress("test")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Federation consumer should be bound to the server's address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1, 
5_000, 500);
+
+         final SimpleString binding = 
server.bindingQuery(SimpleString.of("test")).getQueueNames().get(0);
+         assertNotNull(binding);
+         assertTrue(binding.startsWith(SimpleString.of("federation")));
+
+         final QueueQueryResult federationBinding = server.queueQuery(binding);
+         if (autoDelete) {
+            assertTrue(federationBinding.isAutoDelete());
+            assertEquals(200, federationBinding.getAutoDeleteDelay());
+            assertEquals(-1, federationBinding.getAutoDeleteMessageCount());
+         } else {
+            assertFalse(federationBinding.isAutoDelete());
+            assertEquals(-1, federationBinding.getAutoDeleteDelay());
+            assertEquals(-1, federationBinding.getAutoDeleteMessageCount());
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         if (autoDelete) {
+            // Queue binding should eventually be auto deleted based on 
configuration
+            Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0, 
5_000, 100);
+         } else {
+            // Should still be there as it wasn't marched as auto delete as 
previously validated.

Review Comment:
   marked



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void 
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws 
Exception {
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.of("test"), 
RoutingType.MULTICAST));
+
+      final Map<String, Object> remoteSourceProperties = new HashMap<>();
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
+      remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, "test");
+         peer.connect("localhost", AMQP_PORT);
+
+         // Precondition is that there were no bindings before the federation 
receiver attaches.
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectAttach().ofSender().withName("federation-address-receiver")
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       .withTarget().also()
+                                       .withSource().withAddress("test");
+
+         // Connect to remote as if some demand had matched our federation 
policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName("federation-address-receiver")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(), 
remoteSourceProperties)
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress("test")
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+         peer.remoteFlow().withLinkCredit(10).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectTransfer().accept();
+
+         // Federation consumer should be bound to the server's address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+         // Federate a message to check link is attached properly
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = 
session.createProducer(session.createTopic("test"));
+
+            producer.send(session.createMessage());
+         }
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDetach();
+         peer.remoteDetach().now();  // simulate demand removed so consumer is 
closed.
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         // Federation consumer should no longer be bound to the server's 
address
+         Wait.assertTrue(() -> 
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         server.stop();
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void testRemoteConnectionSuddenCloseLeaveAddressBindingIntact() 
throws Exception {

Review Comment:
   Maybe SuddenDrop (more in line with the later tests) instead of SuddenClose, 
to help distinguish it wasnt closed in the protocol sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact


Reply via email to