gemmellr commented on code in PR #5161:
URL: https://github.com/apache/activemq-artemis/pull/5161#discussion_r1725165739
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
}
}
+ @Test
+ @Timeout(20)
+ public void
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws
Exception {
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of("test"),
RoutingType.MULTICAST));
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
Review Comment:
If the test is checking the binding is removed proactively...should it
actually be setting it to auto-delete at all?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
}
}
+ @Test
+ @Timeout(20)
+ public void
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws
Exception {
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of("test"),
RoutingType.MULTICAST));
Review Comment:
Rather than literal "test" could it use e.g getTestName()? Perhaps assigned
to a variable to make following the address usage simpler.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
}
}
+ @Test
+ @Timeout(20)
+ public void
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws
Exception {
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of("test"),
RoutingType.MULTICAST));
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test");
+ peer.connect("localhost", AMQP_PORT);
+
+ // Precondition is that there were no bindings before the federation
receiver attaches.
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withTarget().also()
+ .withSource().withAddress("test");
+
+ // Connect to remote as if some demand had matched our federation
policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress("test")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+ peer.remoteFlow().withLinkCredit(10).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectTransfer().accept();
+
+ // Federation consumer should be bound to the server's address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+ // Federate a message to check link is attached properly
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic("test"));
+
+ producer.send(session.createMessage());
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDetach();
+ peer.remoteDetach().now(); // simulate demand removed so consumer is
closed.
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // Federation consumer should no longer be bound to the server's
address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ server.stop();
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testRemoteConnectionSuddenCloseLeaveAddressBindingIntact()
throws Exception {
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of("test"),
RoutingType.MULTICAST));
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test");
+ peer.connect("localhost", AMQP_PORT);
+
+ // Precondition is that there were no bindings before the federation
receiver attaches.
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withTarget().also()
+ .withSource().withAddress("test");
+
+ // Connect to remote as if some demand had matched our federation
policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress("test")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+ peer.remoteFlow().withLinkCredit(10).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectTransfer().accept();
+
+ // Federation consumer should be bound to the server's address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+ // Federate a message to check link is attached properly
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic("test"));
+
+ producer.send(session.createMessage());
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ // Unexpected connection drop should leave durable federation address
subscription in place.
+ Wait.assertTrue(() -> server.getConnectionCount() == 0);
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+ }
+
+ // Send a message to check that the federation binding holds onto sends
while the remote is offline
+ // due to connectivity issues.
+ final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic("test"));
+
+ producer.send(session.createMessage());
Review Comment:
Would be good to make the payload specific/different than before, e.g
"offline", so it can be asserted.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
}
}
+ @Test
+ @Timeout(20)
+ public void
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws
Exception {
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of("test"),
RoutingType.MULTICAST));
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test");
+ peer.connect("localhost", AMQP_PORT);
+
+ // Precondition is that there were no bindings before the federation
receiver attaches.
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withTarget().also()
+ .withSource().withAddress("test");
+
+ // Connect to remote as if some demand had matched our federation
policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress("test")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+ peer.remoteFlow().withLinkCredit(10).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectTransfer().accept();
+
+ // Federation consumer should be bound to the server's address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+ // Federate a message to check link is attached properly
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic("test"));
+
+ producer.send(session.createMessage());
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDetach();
+ peer.remoteDetach().now(); // simulate demand removed so consumer is
closed.
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // Federation consumer should no longer be bound to the server's
address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ server.stop();
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testRemoteConnectionSuddenCloseLeaveAddressBindingIntact()
throws Exception {
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of("test"),
RoutingType.MULTICAST));
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test");
+ peer.connect("localhost", AMQP_PORT);
+
+ // Precondition is that there were no bindings before the federation
receiver attaches.
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withTarget().also()
+ .withSource().withAddress("test");
+
+ // Connect to remote as if some demand had matched our federation
policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress("test")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+ peer.remoteFlow().withLinkCredit(10).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectTransfer().accept();
+
+ // Federation consumer should be bound to the server's address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+ // Federate a message to check link is attached properly
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic("test"));
+
+ producer.send(session.createMessage());
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ // Unexpected connection drop should leave durable federation address
subscription in place.
+ Wait.assertTrue(() -> server.getConnectionCount() == 0);
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+ }
+
+ // Send a message to check that the federation binding holds onto sends
while the remote is offline
+ // due to connectivity issues.
+ final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic("test"));
+
+ producer.send(session.createMessage());
+ }
+
+ // Reconnect again as if the remote has recovered from the unexpected
connection drop
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test");
+ peer.connect("localhost", AMQP_PORT);
+
+ // Precondition is that there was still a binding from the previous
federation whose connection dropped
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withTarget().also()
+ .withSource().withAddress("test");
+ peer.expectTransfer().accept();
+
+ // Connect again to remote as if local demand still matches our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress("test")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+ peer.remoteFlow().withLinkCredit(10).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDetach();
+ peer.remoteDetach().now(); // simulate demand removed so consumer is
closed.
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // Federation consumer should no longer be bound to the server's
address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ server.stop();
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testFederationAddressBindingCleanedUpAfterConnectionDroppedIfConfiguredTo()
throws Exception {
+
doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(true);
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testFederationAddressBindingNotCleanedUpAfterConnectionDroppedIfConfiguredNotTo()
throws Exception {
+
doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(false);
+ }
+
+ private void
doTestFederationAddressBindingAppliesAutoDeletePolicyToCreatedQueue(boolean
autoDelete) throws Exception {
+ server.getConfiguration().setAddressQueueScanPeriod(100);
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of("test"),
RoutingType.MULTICAST));
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ if (autoDelete) {
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 200L);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+ } else {
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, false);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, -1L);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+ }
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test");
+ peer.connect("localhost", AMQP_PORT);
+
+ // Precondition is that there were no bindings before the federation
receiver attaches.
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withTarget().also()
+ .withSource().withAddress("test");
+
+ // Connect to remote as if some demand had matched our federation
policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress("test")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+ peer.remoteFlow().withLinkCredit(10).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // Federation consumer should be bound to the server's address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1,
5_000, 500);
+
+ final SimpleString binding =
server.bindingQuery(SimpleString.of("test")).getQueueNames().get(0);
+ assertNotNull(binding);
+ assertTrue(binding.startsWith(SimpleString.of("federation")));
+
+ final QueueQueryResult federationBinding = server.queueQuery(binding);
+ if (autoDelete) {
+ assertTrue(federationBinding.isAutoDelete());
+ assertEquals(200, federationBinding.getAutoDeleteDelay());
+ assertEquals(-1, federationBinding.getAutoDeleteMessageCount());
+ } else {
+ assertFalse(federationBinding.isAutoDelete());
+ assertEquals(-1, federationBinding.getAutoDeleteDelay());
+ assertEquals(-1, federationBinding.getAutoDeleteMessageCount());
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ if (autoDelete) {
+ // Queue binding should eventually be auto deleted based on
configuration
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0,
5_000, 100);
+ } else {
+ // Should still be there as it wasn't marched as auto delete as
previously validated.
Review Comment:
marked
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java:
##########
@@ -3882,6 +3883,300 @@ public void
testRemoteFederationReceiverCloseWithErrorTerminateRemoteConnection(
}
}
+ @Test
+ @Timeout(20)
+ public void
testRemoteReceiverClosedWhenDemandRemovedCleansUpAddressBinding() throws
Exception {
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.of("test"),
RoutingType.MULTICAST));
+
+ final Map<String, Object> remoteSourceProperties = new HashMap<>();
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 1_000L);
+ remoteSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, "test");
+ peer.connect("localhost", AMQP_PORT);
+
+ // Precondition is that there were no bindings before the federation
receiver attaches.
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectAttach().ofSender().withName("federation-address-receiver")
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withTarget().also()
+ .withSource().withAddress("test");
+
+ // Connect to remote as if some demand had matched our federation
policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName("federation-address-receiver")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
remoteSourceProperties)
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress("test")
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+ peer.remoteFlow().withLinkCredit(10).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectTransfer().accept();
+
+ // Federation consumer should be bound to the server's address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 1);
+
+ // Federate a message to check link is attached properly
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer =
session.createProducer(session.createTopic("test"));
+
+ producer.send(session.createMessage());
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDetach();
+ peer.remoteDetach().now(); // simulate demand removed so consumer is
closed.
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // Federation consumer should no longer be bound to the server's
address
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.of("test")).getQueueNames().size() == 0);
+
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ server.stop();
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void testRemoteConnectionSuddenCloseLeaveAddressBindingIntact()
throws Exception {
Review Comment:
Maybe SuddenDrop (more in line with the later tests) instead of SuddenClose,
to help distinguish it wasnt closed in the protocol sense?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact