gemmellr commented on code in PR #5478: URL: https://github.com/apache/activemq-artemis/pull/5478#discussion_r1937135108
########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java: ########## @@ -137,51 +139,71 @@ protected void signalError(Exception cause) { @Override void registerFederationManagement() throws Exception { - // Not yet implemented for the target side of the federation connection + if (brokerConnection.isManagable()) { + AMQPFederationManagementSupport.registerFederationTarget(brokerConnection.getNodeId(), brokerConnection.getName(), this); + } } @Override void unregisterFederationManagement() throws Exception { - // Not yet implemented for the target side of the federation connection + if (brokerConnection.isManagable()) { + AMQPFederationManagementSupport.unregisterFederationTarget(brokerConnection.getNodeId(), brokerConnection.getName(), this); + } } @Override void registerLocalPolicyManagement(AMQPFederationLocalPolicyManager manager) throws Exception { - // Not yet implemented for the target side of the federation connection + if (brokerConnection.isManagable()) { + AMQPFederationManagementSupport.registerLocalPolicyOnTarget(brokerConnection.getNodeId(), brokerConnection.getName(), manager); + } } @Override void unregisterLocalPolicyManagement(AMQPFederationLocalPolicyManager manager) throws Exception { - // Not yet implemented for the target side of the federation connection + if (brokerConnection.isManagable()) { + AMQPFederationManagementSupport.unregisterLocalPolicyOnTarget(brokerConnection.getNodeId(), brokerConnection.getName(), manager); + } } @Override void registerRemotePolicyManagement(AMQPFederationRemotePolicyManager manager) throws Exception { - // Not yet implemented for the target side of the federation connection + if (brokerConnection.isManagable()) { + AMQPFederationManagementSupport.registerRemotePolicyOnTarget(brokerConnection.getNodeId(), brokerConnection.getName(), manager); + } } @Override void unregisterRemotePolicyManagement(AMQPFederationRemotePolicyManager manager) throws Exception { - // Not yet implemented for the target side of the federation connection + if (brokerConnection.isManagable()) { + AMQPFederationManagementSupport.unregisterRemotePolicyOnTarget(brokerConnection.getNodeId(), brokerConnection.getName(), manager); + } } @Override void registerFederationConsumerManagement(AMQPFederationConsumer consumer) throws Exception { - // Not yet implemented for the target side of the federation connection + if (brokerConnection.isManagable()) { + AMQPFederationManagementSupport.registerFederationTargetConsumer(brokerConnection.getNodeId(), brokerConnection.getName(), consumer); + } } @Override void unregisterFederationConsumerManagement(AMQPFederationConsumer consumer) throws Exception { - // Not yet implemented for the target side of the federation connection + if (brokerConnection.isManagable()) { + AMQPFederationManagementSupport.unregisterFederationTargetConsumer(brokerConnection.getNodeId(), brokerConnection.getName(), consumer); + } } @Override void registerFederationProducerManagement(AMQPFederationSenderController sender) throws Exception { - // Not yet implemented for the target side of the federation connection + if (brokerConnection.isManagable()) { + AMQPFederationManagementSupport.registerFederationTargetProducer(brokerConnection.getNodeId(), brokerConnection.getName(), sender); + } } @Override void unregisterFederationProdcerManagement(AMQPFederationSenderController sender) throws Exception { - // Not yet implemented for the target side of the federation connection + if (brokerConnection.isManagable()) { Review Comment: Not the change itself, but the method its in: typo Prodcer -> Producer. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java: ########## @@ -1695,4 +1710,305 @@ public void testRemoteQueueFederationTrackingCleanedUpOnBrokerConnectionStopped( peer.close(); } } + + @Test + @Timeout(20) + public void testRemoteBrokerRegistersAndRemovesRemoteAddressFederationBrokerConnectionInManagement() throws Exception { + server.start(); + server.addAddressInfo(new AddressInfo(getTestName()).addRoutingType(RoutingType.MULTICAST)); + + final String serverNodeId = server.getNodeID().toString(); + final String brokerConnectionName = getTestName(); + final String remoteBrokerConnectionName = ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." + getTestName(); + final String federationResourceName = + AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId, brokerConnectionName, getTestName()); + final String policyResourceName = AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId, brokerConnectionName, getTestName(), "address-policy"); + final String producerResourceName = AMQPFederationManagementSupport.getFederationTargetAddressProducerResourceName(serverNodeId, brokerConnectionName, getTestName(), "address-policy", getTestName()); + + // Test registers and cleans up on connection closed by remote + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, serverNodeId, brokerConnectionName, getTestName()); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final RemoteBrokerConnectionControl remoteBrokerConnection = (RemoteBrokerConnectionControl) + server.getManagementService().getResource(remoteBrokerConnectionName); + + assertNotNull(remoteBrokerConnection); + assertEquals(getTestName(), remoteBrokerConnection.getName()); + assertEquals(server.getNodeID().toString(), remoteBrokerConnection.getNodeId()); + + final AMQPFederationControl federationControl = + (AMQPFederationControl) server.getManagementService().getResource(federationResourceName); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(getTestName()) + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withSource().withAddress(getTestName()); + + // Connect to remote as if an queue had demand and matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(getTestName()) + .withProperty(FEDERATION_POLICY_NAME.toString(), "address-policy") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(getTestName()) + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationRemotePolicyControlType remotePolicyControl = (AMQPFederationRemotePolicyControlType) + server.getManagementService().getResource(policyResourceName); + AMQPFederationProducerControl producerControl = (AMQPFederationProducerControl) + server.getManagementService().getResource(producerResourceName); + + assertNotNull(federationControl); + assertNotNull(remotePolicyControl); + assertNotNull(producerControl); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + Wait.assertTrue(() -> server.getManagementService().getResource(remoteBrokerConnectionName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(federationResourceName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(policyResourceName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(producerResourceName) == null, 5_000, 50); + } + + // Test registers and cleans up on connection dropped + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, serverNodeId, brokerConnectionName, getTestName()); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationControl federationControl = + (AMQPFederationControl) server.getManagementService().getResource(federationResourceName); + + assertNotNull(federationControl); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(getTestName()) + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withSource().withAddress(getTestName()); + + // Connect to remote as if an queue had demand and matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(getTestName()) + .withProperty(FEDERATION_POLICY_NAME.toString(), "address-policy") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(getTestName()) + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationRemotePolicyControlType remotePolicyControl = (AMQPFederationRemotePolicyControlType) + server.getManagementService().getResource(policyResourceName); + AMQPFederationProducerControl producerControl = (AMQPFederationProducerControl) + server.getManagementService().getResource(producerResourceName); + + assertNotNull(federationControl); + assertNotNull(remotePolicyControl); + assertNotNull(producerControl); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + Wait.assertTrue(() -> server.getManagementService().getResource(remoteBrokerConnectionName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(federationResourceName) == null, 5_000, 50); + } + } + + @Test + @Timeout(20) + public void testRemoteBrokerRegistersAndRemovesRemoteQueueFederationBrokerConnectionInManagement() throws Exception { + server.start(); + server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST) + .setAddress(getTestName()) + .setAutoCreated(false)); + + final String serverNodeId = server.getNodeID().toString(); + final String brokerConnectionName = getTestName(); + final String remoteBrokerConnectionName = ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." + getTestName(); + final String federationResourceName = + AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId, brokerConnectionName, getTestName()); + final String policyResourceName = AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId, brokerConnectionName, getTestName(), "queue-policy"); + final String producerResourceName = AMQPFederationManagementSupport.getFederationTargetQueueProducerResourceName(serverNodeId, brokerConnectionName, getTestName(), "queue-policy", getTestName() + "::" + getTestName()); + + // Test registers and cleans up on connection closed by remote + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, serverNodeId, brokerConnectionName, getTestName()); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final RemoteBrokerConnectionControl remoteBrokerConnection = (RemoteBrokerConnectionControl) + server.getManagementService().getResource(remoteBrokerConnectionName); + + assertNotNull(remoteBrokerConnection); + assertEquals(getTestName(), remoteBrokerConnection.getName()); + assertEquals(server.getNodeID().toString(), remoteBrokerConnection.getNodeId()); + + final AMQPFederationControl federationControl = + (AMQPFederationControl) server.getManagementService().getResource(federationResourceName); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(getTestName() + "::" + getTestName()) + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()) + .withSource().withAddress(getTestName() + "::" + getTestName()); + + // Connect to remote as if an queue had demand and matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(getTestName() + "::" + getTestName()) + .withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT) + .withProperty(FEDERATION_POLICY_NAME.toString(), "queue-policy") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(getTestName() + "::" + getTestName()) + .withCapabilities("queue") + .and() + .withTarget().and() + .now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationRemotePolicyControlType remotePolicyControl = (AMQPFederationRemotePolicyControlType) + server.getManagementService().getResource(policyResourceName); + AMQPFederationProducerControl producerControl = (AMQPFederationProducerControl) + server.getManagementService().getResource(producerResourceName); Review Comment: make both final? ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationManagementSupport.java: ########## @@ -141,10 +172,58 @@ public static ObjectName getFederationSourceObjectName(ManagementService managem ObjectName.quote(federationName))); } + /** + * Register the given {@link AMQPFederationTarget} instance with the broker management services. + * + * @param federation + * The federation target instance being registered with management. + * + * @throws Exception if an error occurs while registering the federation with the management services. + */ + public static void registerFederationTarget(String remoteNodeId, String brokerConnectionName, AMQPFederationTarget federation) throws Exception { + final String federationName = federation.getName(); + final ActiveMQServer server = federation.getServer(); + final ManagementService management = server.getManagementService(); + final AMQPFederationTargetControlType control = new AMQPFederationTargetControlType(server, federation); + + management.registerInJMX(getFederationTargetObjectName(management, remoteNodeId, brokerConnectionName, federationName), control); + management.registerInRegistry(getFederationTargetResourceName(remoteNodeId, brokerConnectionName, federationName), control); + } + + /** + * Unregister the given {@link AMQPFederationTarget} instance with the broker management services. + * + * @param federation + * The federation target instance being unregistered from management. + * + * @throws Exception if an error occurs while unregistering the federation with the management services. + */ + public static void unregisterFederationTarget(String remoteNodeId, String brokerConnectionName, AMQPFederationTarget federation) throws Exception { + final String federationName = federation.getName(); + final ActiveMQServer server = federation.getServer(); + final ManagementService management = server.getManagementService(); + + management.unregisterFromJMX(getFederationTargetObjectName(management, remoteNodeId, brokerConnectionName, federationName)); + management.unregisterFromRegistry(getFederationTargetResourceName(remoteNodeId, brokerConnectionName, federationName)); + } + + public static String getFederationTargetResourceName(String remoteNodeId, String brokerConnectionName, String federationName) { + return String.format(FEDERATION_TARGET_RESOURCE_TEMPLATE, remoteNodeId, brokerConnectionName, federationName); + } + + public static ObjectName getFederationTargetObjectName(ManagementService management, String remoteNodeId, String brokerConnection, String federationName) throws Exception { + final String brokerConnectionName = management.getObjectNameBuilder().getRemoteBrokerConnectionBaseObjectNameString(remoteNodeId, brokerConnection); Review Comment: Bit of a nitpick, but 'brokerConnectionName' keeps making me think its just the connection name, when its simply not...maybe make it 'brokerConnectionObjectName' for clarity ? (same in the several other places getRemoteBrokerConnectionBaseObjectNameString is called) ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java: ########## @@ -1695,4 +1710,305 @@ public void testRemoteQueueFederationTrackingCleanedUpOnBrokerConnectionStopped( peer.close(); } } + + @Test + @Timeout(20) + public void testRemoteBrokerRegistersAndRemovesRemoteAddressFederationBrokerConnectionInManagement() throws Exception { + server.start(); + server.addAddressInfo(new AddressInfo(getTestName()).addRoutingType(RoutingType.MULTICAST)); + + final String serverNodeId = server.getNodeID().toString(); + final String brokerConnectionName = getTestName(); + final String remoteBrokerConnectionName = ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." + getTestName(); + final String federationResourceName = + AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId, brokerConnectionName, getTestName()); + final String policyResourceName = AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId, brokerConnectionName, getTestName(), "address-policy"); + final String producerResourceName = AMQPFederationManagementSupport.getFederationTargetAddressProducerResourceName(serverNodeId, brokerConnectionName, getTestName(), "address-policy", getTestName()); + + // Test registers and cleans up on connection closed by remote + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, serverNodeId, brokerConnectionName, getTestName()); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final RemoteBrokerConnectionControl remoteBrokerConnection = (RemoteBrokerConnectionControl) + server.getManagementService().getResource(remoteBrokerConnectionName); + + assertNotNull(remoteBrokerConnection); + assertEquals(getTestName(), remoteBrokerConnection.getName()); + assertEquals(server.getNodeID().toString(), remoteBrokerConnection.getNodeId()); + + final AMQPFederationControl federationControl = + (AMQPFederationControl) server.getManagementService().getResource(federationResourceName); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(getTestName()) + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withSource().withAddress(getTestName()); + + // Connect to remote as if an queue had demand and matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(getTestName()) + .withProperty(FEDERATION_POLICY_NAME.toString(), "address-policy") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(getTestName()) + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationRemotePolicyControlType remotePolicyControl = (AMQPFederationRemotePolicyControlType) + server.getManagementService().getResource(policyResourceName); + AMQPFederationProducerControl producerControl = (AMQPFederationProducerControl) Review Comment: make both final? ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java: ########## @@ -1695,4 +1710,305 @@ public void testRemoteQueueFederationTrackingCleanedUpOnBrokerConnectionStopped( peer.close(); } } + + @Test + @Timeout(20) + public void testRemoteBrokerRegistersAndRemovesRemoteAddressFederationBrokerConnectionInManagement() throws Exception { + server.start(); + server.addAddressInfo(new AddressInfo(getTestName()).addRoutingType(RoutingType.MULTICAST)); + + final String serverNodeId = server.getNodeID().toString(); + final String brokerConnectionName = getTestName(); + final String remoteBrokerConnectionName = ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." + getTestName(); + final String federationResourceName = + AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId, brokerConnectionName, getTestName()); + final String policyResourceName = AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId, brokerConnectionName, getTestName(), "address-policy"); + final String producerResourceName = AMQPFederationManagementSupport.getFederationTargetAddressProducerResourceName(serverNodeId, brokerConnectionName, getTestName(), "address-policy", getTestName()); + + // Test registers and cleans up on connection closed by remote + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, serverNodeId, brokerConnectionName, getTestName()); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final RemoteBrokerConnectionControl remoteBrokerConnection = (RemoteBrokerConnectionControl) + server.getManagementService().getResource(remoteBrokerConnectionName); + + assertNotNull(remoteBrokerConnection); + assertEquals(getTestName(), remoteBrokerConnection.getName()); + assertEquals(server.getNodeID().toString(), remoteBrokerConnection.getNodeId()); + + final AMQPFederationControl federationControl = + (AMQPFederationControl) server.getManagementService().getResource(federationResourceName); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(getTestName()) + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withSource().withAddress(getTestName()); + + // Connect to remote as if an queue had demand and matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(getTestName()) + .withProperty(FEDERATION_POLICY_NAME.toString(), "address-policy") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(getTestName()) + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationRemotePolicyControlType remotePolicyControl = (AMQPFederationRemotePolicyControlType) + server.getManagementService().getResource(policyResourceName); + AMQPFederationProducerControl producerControl = (AMQPFederationProducerControl) + server.getManagementService().getResource(producerResourceName); + + assertNotNull(federationControl); + assertNotNull(remotePolicyControl); + assertNotNull(producerControl); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + Wait.assertTrue(() -> server.getManagementService().getResource(remoteBrokerConnectionName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(federationResourceName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(policyResourceName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(producerResourceName) == null, 5_000, 50); + } + + // Test registers and cleans up on connection dropped + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, serverNodeId, brokerConnectionName, getTestName()); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationControl federationControl = + (AMQPFederationControl) server.getManagementService().getResource(federationResourceName); + + assertNotNull(federationControl); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(getTestName()) + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withSource().withAddress(getTestName()); + + // Connect to remote as if an queue had demand and matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(getTestName()) + .withProperty(FEDERATION_POLICY_NAME.toString(), "address-policy") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(getTestName()) + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationRemotePolicyControlType remotePolicyControl = (AMQPFederationRemotePolicyControlType) + server.getManagementService().getResource(policyResourceName); + AMQPFederationProducerControl producerControl = (AMQPFederationProducerControl) + server.getManagementService().getResource(producerResourceName); Review Comment: make both final? ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java: ########## @@ -1695,4 +1710,305 @@ public void testRemoteQueueFederationTrackingCleanedUpOnBrokerConnectionStopped( peer.close(); } } + + @Test + @Timeout(20) + public void testRemoteBrokerRegistersAndRemovesRemoteAddressFederationBrokerConnectionInManagement() throws Exception { + server.start(); + server.addAddressInfo(new AddressInfo(getTestName()).addRoutingType(RoutingType.MULTICAST)); + + final String serverNodeId = server.getNodeID().toString(); + final String brokerConnectionName = getTestName(); + final String remoteBrokerConnectionName = ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." + getTestName(); + final String federationResourceName = + AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId, brokerConnectionName, getTestName()); + final String policyResourceName = AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId, brokerConnectionName, getTestName(), "address-policy"); + final String producerResourceName = AMQPFederationManagementSupport.getFederationTargetAddressProducerResourceName(serverNodeId, brokerConnectionName, getTestName(), "address-policy", getTestName()); + + // Test registers and cleans up on connection closed by remote + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, serverNodeId, brokerConnectionName, getTestName()); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final RemoteBrokerConnectionControl remoteBrokerConnection = (RemoteBrokerConnectionControl) + server.getManagementService().getResource(remoteBrokerConnectionName); + + assertNotNull(remoteBrokerConnection); + assertEquals(getTestName(), remoteBrokerConnection.getName()); + assertEquals(server.getNodeID().toString(), remoteBrokerConnection.getNodeId()); + + final AMQPFederationControl federationControl = + (AMQPFederationControl) server.getManagementService().getResource(federationResourceName); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(getTestName()) + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withSource().withAddress(getTestName()); + + // Connect to remote as if an queue had demand and matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(getTestName()) + .withProperty(FEDERATION_POLICY_NAME.toString(), "address-policy") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(getTestName()) + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationRemotePolicyControlType remotePolicyControl = (AMQPFederationRemotePolicyControlType) + server.getManagementService().getResource(policyResourceName); + AMQPFederationProducerControl producerControl = (AMQPFederationProducerControl) + server.getManagementService().getResource(producerResourceName); + + assertNotNull(federationControl); + assertNotNull(remotePolicyControl); + assertNotNull(producerControl); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + Wait.assertTrue(() -> server.getManagementService().getResource(remoteBrokerConnectionName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(federationResourceName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(policyResourceName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(producerResourceName) == null, 5_000, 50); + } + + // Test registers and cleans up on connection dropped + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, serverNodeId, brokerConnectionName, getTestName()); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationControl federationControl = + (AMQPFederationControl) server.getManagementService().getResource(federationResourceName); + + assertNotNull(federationControl); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(getTestName()) + .withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withSource().withAddress(getTestName()); + + // Connect to remote as if an queue had demand and matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString()) + .withName(getTestName()) + .withProperty(FEDERATION_POLICY_NAME.toString(), "address-policy") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(getTestName()) + .withCapabilities("topic") + .and() + .withTarget().and() + .now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationRemotePolicyControlType remotePolicyControl = (AMQPFederationRemotePolicyControlType) + server.getManagementService().getResource(policyResourceName); + AMQPFederationProducerControl producerControl = (AMQPFederationProducerControl) + server.getManagementService().getResource(producerResourceName); + + assertNotNull(federationControl); + assertNotNull(remotePolicyControl); + assertNotNull(producerControl); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + Wait.assertTrue(() -> server.getManagementService().getResource(remoteBrokerConnectionName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(federationResourceName) == null, 5_000, 50); + } + } + + @Test + @Timeout(20) + public void testRemoteBrokerRegistersAndRemovesRemoteQueueFederationBrokerConnectionInManagement() throws Exception { + server.start(); + server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST) + .setAddress(getTestName()) + .setAutoCreated(false)); + + final String serverNodeId = server.getNodeID().toString(); + final String brokerConnectionName = getTestName(); + final String remoteBrokerConnectionName = ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." + getTestName(); + final String federationResourceName = + AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId, brokerConnectionName, getTestName()); + final String policyResourceName = AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId, brokerConnectionName, getTestName(), "queue-policy"); + final String producerResourceName = AMQPFederationManagementSupport.getFederationTargetQueueProducerResourceName(serverNodeId, brokerConnectionName, getTestName(), "queue-policy", getTestName() + "::" + getTestName()); + + // Test registers and cleans up on connection closed by remote + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, serverNodeId, brokerConnectionName, getTestName()); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final RemoteBrokerConnectionControl remoteBrokerConnection = (RemoteBrokerConnectionControl) + server.getManagementService().getResource(remoteBrokerConnectionName); + + assertNotNull(remoteBrokerConnection); + assertEquals(getTestName(), remoteBrokerConnection.getName()); + assertEquals(server.getNodeID().toString(), remoteBrokerConnection.getNodeId()); + + final AMQPFederationControl federationControl = + (AMQPFederationControl) server.getManagementService().getResource(federationResourceName); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(getTestName() + "::" + getTestName()) + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()) + .withSource().withAddress(getTestName() + "::" + getTestName()); + + // Connect to remote as if an queue had demand and matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(getTestName() + "::" + getTestName()) + .withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT) + .withProperty(FEDERATION_POLICY_NAME.toString(), "queue-policy") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(getTestName() + "::" + getTestName()) + .withCapabilities("queue") + .and() + .withTarget().and() + .now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationRemotePolicyControlType remotePolicyControl = (AMQPFederationRemotePolicyControlType) + server.getManagementService().getResource(policyResourceName); + AMQPFederationProducerControl producerControl = (AMQPFederationProducerControl) + server.getManagementService().getResource(producerResourceName); + + assertNotNull(federationControl); + assertNotNull(remotePolicyControl); + assertNotNull(producerControl); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectClose(); + peer.remoteClose().now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.close(); + + Wait.assertTrue(() -> server.getManagementService().getResource(remoteBrokerConnectionName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(federationResourceName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(policyResourceName) == null, 5_000, 50); + Wait.assertTrue(() -> server.getManagementService().getResource(producerResourceName) == null, 5_000, 50); + } + + // Test registers and cleans up on connection dropped + try (ProtonTestClient peer = new ProtonTestClient()) { + scriptFederationConnectToRemote(peer, serverNodeId, brokerConnectionName, getTestName()); + peer.connect("localhost", AMQP_PORT); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationControl federationControl = + (AMQPFederationControl) server.getManagementService().getResource(federationResourceName); + + assertNotNull(federationControl); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectAttach().ofSender().withName(getTestName() + "::" + getTestName()) + .withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString()) + .withSource().withAddress(getTestName() + "::" + getTestName()); + + // Connect to remote as if an queue had demand and matched our federation policy + peer.remoteAttach().ofReceiver() + .withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString()) + .withName(getTestName() + "::" + getTestName()) + .withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT) + .withProperty(FEDERATION_POLICY_NAME.toString(), "queue-policy") + .withSenderSettleModeUnsettled() + .withReceivervSettlesFirst() + .withSource().withDurabilityOfNone() + .withExpiryPolicyOnLinkDetach() + .withAddress(getTestName() + "::" + getTestName()) + .withCapabilities("queue") + .and() + .withTarget().and() + .now(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + final AMQPFederationRemotePolicyControlType remotePolicyControl = (AMQPFederationRemotePolicyControlType) + server.getManagementService().getResource(policyResourceName); + AMQPFederationProducerControl producerControl = (AMQPFederationProducerControl) + server.getManagementService().getResource(producerResourceName); Review Comment: make both final? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For additional commands, e-mail: gitbox-h...@activemq.apache.org For further information, visit: https://activemq.apache.org/contact