gemmellr commented on code in PR #5478:
URL: https://github.com/apache/activemq-artemis/pull/5478#discussion_r1937135108


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java:
##########
@@ -137,51 +139,71 @@ protected void signalError(Exception cause) {
 
    @Override
    void registerFederationManagement() throws Exception {
-      // Not yet implemented for the target side of the federation connection
+      if (brokerConnection.isManagable()) {
+         
AMQPFederationManagementSupport.registerFederationTarget(brokerConnection.getNodeId(),
 brokerConnection.getName(), this);
+      }
    }
 
    @Override
    void unregisterFederationManagement() throws Exception {
-      // Not yet implemented for the target side of the federation connection
+      if (brokerConnection.isManagable()) {
+         
AMQPFederationManagementSupport.unregisterFederationTarget(brokerConnection.getNodeId(),
 brokerConnection.getName(), this);
+      }
    }
 
    @Override
    void registerLocalPolicyManagement(AMQPFederationLocalPolicyManager 
manager) throws Exception {
-      // Not yet implemented for the target side of the federation connection
+      if (brokerConnection.isManagable()) {
+         
AMQPFederationManagementSupport.registerLocalPolicyOnTarget(brokerConnection.getNodeId(),
 brokerConnection.getName(), manager);
+      }
    }
 
    @Override
    void unregisterLocalPolicyManagement(AMQPFederationLocalPolicyManager 
manager) throws Exception {
-      // Not yet implemented for the target side of the federation connection
+      if (brokerConnection.isManagable()) {
+         
AMQPFederationManagementSupport.unregisterLocalPolicyOnTarget(brokerConnection.getNodeId(),
 brokerConnection.getName(), manager);
+      }
    }
 
    @Override
    void registerRemotePolicyManagement(AMQPFederationRemotePolicyManager 
manager) throws Exception {
-      // Not yet implemented for the target side of the federation connection
+      if (brokerConnection.isManagable()) {
+         
AMQPFederationManagementSupport.registerRemotePolicyOnTarget(brokerConnection.getNodeId(),
 brokerConnection.getName(), manager);
+      }
    }
 
    @Override
    void unregisterRemotePolicyManagement(AMQPFederationRemotePolicyManager 
manager) throws Exception {
-      // Not yet implemented for the target side of the federation connection
+      if (brokerConnection.isManagable()) {
+         
AMQPFederationManagementSupport.unregisterRemotePolicyOnTarget(brokerConnection.getNodeId(),
 brokerConnection.getName(), manager);
+      }
    }
 
    @Override
    void registerFederationConsumerManagement(AMQPFederationConsumer consumer) 
throws Exception {
-      // Not yet implemented for the target side of the federation connection
+      if (brokerConnection.isManagable()) {
+         
AMQPFederationManagementSupport.registerFederationTargetConsumer(brokerConnection.getNodeId(),
 brokerConnection.getName(), consumer);
+      }
    }
 
    @Override
    void unregisterFederationConsumerManagement(AMQPFederationConsumer 
consumer) throws Exception {
-      // Not yet implemented for the target side of the federation connection
+      if (brokerConnection.isManagable()) {
+         
AMQPFederationManagementSupport.unregisterFederationTargetConsumer(brokerConnection.getNodeId(),
 brokerConnection.getName(), consumer);
+      }
    }
 
    @Override
    void registerFederationProducerManagement(AMQPFederationSenderController 
sender) throws Exception {
-      // Not yet implemented for the target side of the federation connection
+      if (brokerConnection.isManagable()) {
+         
AMQPFederationManagementSupport.registerFederationTargetProducer(brokerConnection.getNodeId(),
 brokerConnection.getName(), sender);
+      }
    }
 
    @Override
    void unregisterFederationProdcerManagement(AMQPFederationSenderController 
sender) throws Exception {
-      // Not yet implemented for the target side of the federation connection
+      if (brokerConnection.isManagable()) {

Review Comment:
   Not the change itself, but the method its in: typo Prodcer -> Producer.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java:
##########
@@ -1695,4 +1710,305 @@ public void 
testRemoteQueueFederationTrackingCleanedUpOnBrokerConnectionStopped(
          peer.close();
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testRemoteBrokerRegistersAndRemovesRemoteAddressFederationBrokerConnectionInManagement()
 throws Exception {
+      server.start();
+      server.addAddressInfo(new 
AddressInfo(getTestName()).addRoutingType(RoutingType.MULTICAST));
+
+      final String serverNodeId = server.getNodeID().toString();
+      final String brokerConnectionName = getTestName();
+      final String remoteBrokerConnectionName = 
ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." + 
getTestName();
+      final String federationResourceName =
+         
AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId, 
brokerConnectionName, getTestName());
+      final String policyResourceName = 
AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId,
 brokerConnectionName, getTestName(), "address-policy");
+      final String producerResourceName = 
AMQPFederationManagementSupport.getFederationTargetAddressProducerResourceName(serverNodeId,
 brokerConnectionName, getTestName(), "address-policy", getTestName());
+
+      // Test registers and cleans up on connection closed by remote
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, serverNodeId, 
brokerConnectionName, getTestName());
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final RemoteBrokerConnectionControl remoteBrokerConnection = 
(RemoteBrokerConnectionControl)
+            
server.getManagementService().getResource(remoteBrokerConnectionName);
+
+         assertNotNull(remoteBrokerConnection);
+         assertEquals(getTestName(), remoteBrokerConnection.getName());
+         assertEquals(server.getNodeID().toString(), 
remoteBrokerConnection.getNodeId());
+
+         final AMQPFederationControl federationControl =
+            (AMQPFederationControl) 
server.getManagementService().getResource(federationResourceName);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().withName(getTestName())
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       
.withSource().withAddress(getTestName());
+
+         // Connect to remote as if an queue had demand and matched our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName(getTestName())
+                            .withProperty(FEDERATION_POLICY_NAME.toString(), 
"address-policy")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress(getTestName())
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationRemotePolicyControlType remotePolicyControl = 
(AMQPFederationRemotePolicyControlType)
+            server.getManagementService().getResource(policyResourceName);
+         AMQPFederationProducerControl producerControl = 
(AMQPFederationProducerControl)
+            server.getManagementService().getResource(producerResourceName);
+
+         assertNotNull(federationControl);
+         assertNotNull(remotePolicyControl);
+         assertNotNull(producerControl);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(remoteBrokerConnectionName) == null, 
5_000, 50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(federationResourceName) == null, 
5_000, 50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(policyResourceName) == null, 5_000, 
50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(producerResourceName) == null, 5_000, 
50);
+      }
+
+      // Test registers and cleans up on connection dropped
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, serverNodeId, 
brokerConnectionName, getTestName());
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationControl federationControl =
+            (AMQPFederationControl) 
server.getManagementService().getResource(federationResourceName);
+
+         assertNotNull(federationControl);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().withName(getTestName())
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       
.withSource().withAddress(getTestName());
+
+         // Connect to remote as if an queue had demand and matched our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName(getTestName())
+                            .withProperty(FEDERATION_POLICY_NAME.toString(), 
"address-policy")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress(getTestName())
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationRemotePolicyControlType remotePolicyControl = 
(AMQPFederationRemotePolicyControlType)
+            server.getManagementService().getResource(policyResourceName);
+         AMQPFederationProducerControl producerControl = 
(AMQPFederationProducerControl)
+            server.getManagementService().getResource(producerResourceName);
+
+         assertNotNull(federationControl);
+         assertNotNull(remotePolicyControl);
+         assertNotNull(producerControl);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(remoteBrokerConnectionName) == null, 
5_000, 50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(federationResourceName) == null, 
5_000, 50);
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testRemoteBrokerRegistersAndRemovesRemoteQueueFederationBrokerConnectionInManagement()
 throws Exception {
+      server.start();
+      
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+                                                             
.setAddress(getTestName())
+                                                             
.setAutoCreated(false));
+
+      final String serverNodeId = server.getNodeID().toString();
+      final String brokerConnectionName = getTestName();
+      final String remoteBrokerConnectionName = 
ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." + 
getTestName();
+      final String federationResourceName =
+         
AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId, 
brokerConnectionName, getTestName());
+      final String policyResourceName = 
AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId,
 brokerConnectionName, getTestName(), "queue-policy");
+      final String producerResourceName = 
AMQPFederationManagementSupport.getFederationTargetQueueProducerResourceName(serverNodeId,
 brokerConnectionName, getTestName(), "queue-policy", getTestName() + "::" + 
getTestName());
+
+      // Test registers and cleans up on connection closed by remote
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, serverNodeId, 
brokerConnectionName, getTestName());
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final RemoteBrokerConnectionControl remoteBrokerConnection = 
(RemoteBrokerConnectionControl)
+            
server.getManagementService().getResource(remoteBrokerConnectionName);
+
+         assertNotNull(remoteBrokerConnection);
+         assertEquals(getTestName(), remoteBrokerConnection.getName());
+         assertEquals(server.getNodeID().toString(), 
remoteBrokerConnection.getNodeId());
+
+         final AMQPFederationControl federationControl =
+            (AMQPFederationControl) 
server.getManagementService().getResource(federationResourceName);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().withName(getTestName() + "::" + 
getTestName())
+                                       
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
+                                       .withSource().withAddress(getTestName() 
+ "::" + getTestName());
+
+         // Connect to remote as if an queue had demand and matched our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
+                            .withName(getTestName() + "::" + getTestName())
+                            
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), 
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
+                            .withProperty(FEDERATION_POLICY_NAME.toString(), 
"queue-policy")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress(getTestName() + "::" + 
getTestName())
+                                         .withCapabilities("queue")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationRemotePolicyControlType remotePolicyControl = 
(AMQPFederationRemotePolicyControlType)
+            server.getManagementService().getResource(policyResourceName);
+         AMQPFederationProducerControl producerControl = 
(AMQPFederationProducerControl)
+            server.getManagementService().getResource(producerResourceName);

Review Comment:
   make both final?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationManagementSupport.java:
##########
@@ -141,10 +172,58 @@ public static ObjectName 
getFederationSourceObjectName(ManagementService managem
             ObjectName.quote(federationName)));
    }
 
+   /**
+    * Register the given {@link AMQPFederationTarget} instance with the broker 
management services.
+    *
+    * @param federation
+    *    The federation target instance being registered with management.
+    *
+    * @throws Exception if an error occurs while registering the federation 
with the management services.
+    */
+   public static void registerFederationTarget(String remoteNodeId, String 
brokerConnectionName, AMQPFederationTarget federation) throws Exception {
+      final String federationName = federation.getName();
+      final ActiveMQServer server = federation.getServer();
+      final ManagementService management = server.getManagementService();
+      final AMQPFederationTargetControlType control = new 
AMQPFederationTargetControlType(server, federation);
+
+      management.registerInJMX(getFederationTargetObjectName(management, 
remoteNodeId, brokerConnectionName, federationName), control);
+      
management.registerInRegistry(getFederationTargetResourceName(remoteNodeId, 
brokerConnectionName, federationName), control);
+   }
+
+   /**
+    * Unregister the given {@link AMQPFederationTarget} instance with the 
broker management services.
+    *
+    * @param federation
+    *    The federation target instance being unregistered from management.
+    *
+    * @throws Exception if an error occurs while unregistering the federation 
with the management services.
+    */
+   public static void unregisterFederationTarget(String remoteNodeId, String 
brokerConnectionName, AMQPFederationTarget federation) throws Exception {
+      final String federationName = federation.getName();
+      final ActiveMQServer server = federation.getServer();
+      final ManagementService management = server.getManagementService();
+
+      management.unregisterFromJMX(getFederationTargetObjectName(management, 
remoteNodeId, brokerConnectionName, federationName));
+      
management.unregisterFromRegistry(getFederationTargetResourceName(remoteNodeId, 
brokerConnectionName, federationName));
+   }
+
+   public static String getFederationTargetResourceName(String remoteNodeId, 
String brokerConnectionName, String federationName) {
+      return String.format(FEDERATION_TARGET_RESOURCE_TEMPLATE, remoteNodeId, 
brokerConnectionName, federationName);
+   }
+
+   public static ObjectName getFederationTargetObjectName(ManagementService 
management, String remoteNodeId, String brokerConnection, String 
federationName) throws Exception {
+      final String brokerConnectionName = 
management.getObjectNameBuilder().getRemoteBrokerConnectionBaseObjectNameString(remoteNodeId,
 brokerConnection);

Review Comment:
   Bit of a nitpick, but 'brokerConnectionName' keeps making me think its just 
the connection name, when its simply not...maybe make it 
'brokerConnectionObjectName' for clarity ?
   
   (same in the several other places 
getRemoteBrokerConnectionBaseObjectNameString is called)



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java:
##########
@@ -1695,4 +1710,305 @@ public void 
testRemoteQueueFederationTrackingCleanedUpOnBrokerConnectionStopped(
          peer.close();
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testRemoteBrokerRegistersAndRemovesRemoteAddressFederationBrokerConnectionInManagement()
 throws Exception {
+      server.start();
+      server.addAddressInfo(new 
AddressInfo(getTestName()).addRoutingType(RoutingType.MULTICAST));
+
+      final String serverNodeId = server.getNodeID().toString();
+      final String brokerConnectionName = getTestName();
+      final String remoteBrokerConnectionName = 
ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." + 
getTestName();
+      final String federationResourceName =
+         
AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId, 
brokerConnectionName, getTestName());
+      final String policyResourceName = 
AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId,
 brokerConnectionName, getTestName(), "address-policy");
+      final String producerResourceName = 
AMQPFederationManagementSupport.getFederationTargetAddressProducerResourceName(serverNodeId,
 brokerConnectionName, getTestName(), "address-policy", getTestName());
+
+      // Test registers and cleans up on connection closed by remote
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, serverNodeId, 
brokerConnectionName, getTestName());
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final RemoteBrokerConnectionControl remoteBrokerConnection = 
(RemoteBrokerConnectionControl)
+            
server.getManagementService().getResource(remoteBrokerConnectionName);
+
+         assertNotNull(remoteBrokerConnection);
+         assertEquals(getTestName(), remoteBrokerConnection.getName());
+         assertEquals(server.getNodeID().toString(), 
remoteBrokerConnection.getNodeId());
+
+         final AMQPFederationControl federationControl =
+            (AMQPFederationControl) 
server.getManagementService().getResource(federationResourceName);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().withName(getTestName())
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       
.withSource().withAddress(getTestName());
+
+         // Connect to remote as if an queue had demand and matched our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName(getTestName())
+                            .withProperty(FEDERATION_POLICY_NAME.toString(), 
"address-policy")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress(getTestName())
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationRemotePolicyControlType remotePolicyControl = 
(AMQPFederationRemotePolicyControlType)
+            server.getManagementService().getResource(policyResourceName);
+         AMQPFederationProducerControl producerControl = 
(AMQPFederationProducerControl)

Review Comment:
   make both final?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java:
##########
@@ -1695,4 +1710,305 @@ public void 
testRemoteQueueFederationTrackingCleanedUpOnBrokerConnectionStopped(
          peer.close();
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testRemoteBrokerRegistersAndRemovesRemoteAddressFederationBrokerConnectionInManagement()
 throws Exception {
+      server.start();
+      server.addAddressInfo(new 
AddressInfo(getTestName()).addRoutingType(RoutingType.MULTICAST));
+
+      final String serverNodeId = server.getNodeID().toString();
+      final String brokerConnectionName = getTestName();
+      final String remoteBrokerConnectionName = 
ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." + 
getTestName();
+      final String federationResourceName =
+         
AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId, 
brokerConnectionName, getTestName());
+      final String policyResourceName = 
AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId,
 brokerConnectionName, getTestName(), "address-policy");
+      final String producerResourceName = 
AMQPFederationManagementSupport.getFederationTargetAddressProducerResourceName(serverNodeId,
 brokerConnectionName, getTestName(), "address-policy", getTestName());
+
+      // Test registers and cleans up on connection closed by remote
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, serverNodeId, 
brokerConnectionName, getTestName());
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final RemoteBrokerConnectionControl remoteBrokerConnection = 
(RemoteBrokerConnectionControl)
+            
server.getManagementService().getResource(remoteBrokerConnectionName);
+
+         assertNotNull(remoteBrokerConnection);
+         assertEquals(getTestName(), remoteBrokerConnection.getName());
+         assertEquals(server.getNodeID().toString(), 
remoteBrokerConnection.getNodeId());
+
+         final AMQPFederationControl federationControl =
+            (AMQPFederationControl) 
server.getManagementService().getResource(federationResourceName);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().withName(getTestName())
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       
.withSource().withAddress(getTestName());
+
+         // Connect to remote as if an queue had demand and matched our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName(getTestName())
+                            .withProperty(FEDERATION_POLICY_NAME.toString(), 
"address-policy")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress(getTestName())
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationRemotePolicyControlType remotePolicyControl = 
(AMQPFederationRemotePolicyControlType)
+            server.getManagementService().getResource(policyResourceName);
+         AMQPFederationProducerControl producerControl = 
(AMQPFederationProducerControl)
+            server.getManagementService().getResource(producerResourceName);
+
+         assertNotNull(federationControl);
+         assertNotNull(remotePolicyControl);
+         assertNotNull(producerControl);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(remoteBrokerConnectionName) == null, 
5_000, 50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(federationResourceName) == null, 
5_000, 50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(policyResourceName) == null, 5_000, 
50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(producerResourceName) == null, 5_000, 
50);
+      }
+
+      // Test registers and cleans up on connection dropped
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, serverNodeId, 
brokerConnectionName, getTestName());
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationControl federationControl =
+            (AMQPFederationControl) 
server.getManagementService().getResource(federationResourceName);
+
+         assertNotNull(federationControl);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().withName(getTestName())
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       
.withSource().withAddress(getTestName());
+
+         // Connect to remote as if an queue had demand and matched our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName(getTestName())
+                            .withProperty(FEDERATION_POLICY_NAME.toString(), 
"address-policy")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress(getTestName())
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationRemotePolicyControlType remotePolicyControl = 
(AMQPFederationRemotePolicyControlType)
+            server.getManagementService().getResource(policyResourceName);
+         AMQPFederationProducerControl producerControl = 
(AMQPFederationProducerControl)
+            server.getManagementService().getResource(producerResourceName);

Review Comment:
   make both final?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java:
##########
@@ -1695,4 +1710,305 @@ public void 
testRemoteQueueFederationTrackingCleanedUpOnBrokerConnectionStopped(
          peer.close();
       }
    }
+
+   @Test
+   @Timeout(20)
+   public void 
testRemoteBrokerRegistersAndRemovesRemoteAddressFederationBrokerConnectionInManagement()
 throws Exception {
+      server.start();
+      server.addAddressInfo(new 
AddressInfo(getTestName()).addRoutingType(RoutingType.MULTICAST));
+
+      final String serverNodeId = server.getNodeID().toString();
+      final String brokerConnectionName = getTestName();
+      final String remoteBrokerConnectionName = 
ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." + 
getTestName();
+      final String federationResourceName =
+         
AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId, 
brokerConnectionName, getTestName());
+      final String policyResourceName = 
AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId,
 brokerConnectionName, getTestName(), "address-policy");
+      final String producerResourceName = 
AMQPFederationManagementSupport.getFederationTargetAddressProducerResourceName(serverNodeId,
 brokerConnectionName, getTestName(), "address-policy", getTestName());
+
+      // Test registers and cleans up on connection closed by remote
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, serverNodeId, 
brokerConnectionName, getTestName());
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final RemoteBrokerConnectionControl remoteBrokerConnection = 
(RemoteBrokerConnectionControl)
+            
server.getManagementService().getResource(remoteBrokerConnectionName);
+
+         assertNotNull(remoteBrokerConnection);
+         assertEquals(getTestName(), remoteBrokerConnection.getName());
+         assertEquals(server.getNodeID().toString(), 
remoteBrokerConnection.getNodeId());
+
+         final AMQPFederationControl federationControl =
+            (AMQPFederationControl) 
server.getManagementService().getResource(federationResourceName);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().withName(getTestName())
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       
.withSource().withAddress(getTestName());
+
+         // Connect to remote as if an queue had demand and matched our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName(getTestName())
+                            .withProperty(FEDERATION_POLICY_NAME.toString(), 
"address-policy")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress(getTestName())
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationRemotePolicyControlType remotePolicyControl = 
(AMQPFederationRemotePolicyControlType)
+            server.getManagementService().getResource(policyResourceName);
+         AMQPFederationProducerControl producerControl = 
(AMQPFederationProducerControl)
+            server.getManagementService().getResource(producerResourceName);
+
+         assertNotNull(federationControl);
+         assertNotNull(remotePolicyControl);
+         assertNotNull(producerControl);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(remoteBrokerConnectionName) == null, 
5_000, 50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(federationResourceName) == null, 
5_000, 50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(policyResourceName) == null, 5_000, 
50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(producerResourceName) == null, 5_000, 
50);
+      }
+
+      // Test registers and cleans up on connection dropped
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, serverNodeId, 
brokerConnectionName, getTestName());
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationControl federationControl =
+            (AMQPFederationControl) 
server.getManagementService().getResource(federationResourceName);
+
+         assertNotNull(federationControl);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().withName(getTestName())
+                                       
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                                       
.withSource().withAddress(getTestName());
+
+         // Connect to remote as if an queue had demand and matched our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+                            .withName(getTestName())
+                            .withProperty(FEDERATION_POLICY_NAME.toString(), 
"address-policy")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress(getTestName())
+                                         .withCapabilities("topic")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationRemotePolicyControlType remotePolicyControl = 
(AMQPFederationRemotePolicyControlType)
+            server.getManagementService().getResource(policyResourceName);
+         AMQPFederationProducerControl producerControl = 
(AMQPFederationProducerControl)
+            server.getManagementService().getResource(producerResourceName);
+
+         assertNotNull(federationControl);
+         assertNotNull(remotePolicyControl);
+         assertNotNull(producerControl);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(remoteBrokerConnectionName) == null, 
5_000, 50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(federationResourceName) == null, 
5_000, 50);
+      }
+   }
+
+   @Test
+   @Timeout(20)
+   public void 
testRemoteBrokerRegistersAndRemovesRemoteQueueFederationBrokerConnectionInManagement()
 throws Exception {
+      server.start();
+      
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+                                                             
.setAddress(getTestName())
+                                                             
.setAutoCreated(false));
+
+      final String serverNodeId = server.getNodeID().toString();
+      final String brokerConnectionName = getTestName();
+      final String remoteBrokerConnectionName = 
ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." + 
getTestName();
+      final String federationResourceName =
+         
AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId, 
brokerConnectionName, getTestName());
+      final String policyResourceName = 
AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId,
 brokerConnectionName, getTestName(), "queue-policy");
+      final String producerResourceName = 
AMQPFederationManagementSupport.getFederationTargetQueueProducerResourceName(serverNodeId,
 brokerConnectionName, getTestName(), "queue-policy", getTestName() + "::" + 
getTestName());
+
+      // Test registers and cleans up on connection closed by remote
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, serverNodeId, 
brokerConnectionName, getTestName());
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final RemoteBrokerConnectionControl remoteBrokerConnection = 
(RemoteBrokerConnectionControl)
+            
server.getManagementService().getResource(remoteBrokerConnectionName);
+
+         assertNotNull(remoteBrokerConnection);
+         assertEquals(getTestName(), remoteBrokerConnection.getName());
+         assertEquals(server.getNodeID().toString(), 
remoteBrokerConnection.getNodeId());
+
+         final AMQPFederationControl federationControl =
+            (AMQPFederationControl) 
server.getManagementService().getResource(federationResourceName);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().withName(getTestName() + "::" + 
getTestName())
+                                       
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
+                                       .withSource().withAddress(getTestName() 
+ "::" + getTestName());
+
+         // Connect to remote as if an queue had demand and matched our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
+                            .withName(getTestName() + "::" + getTestName())
+                            
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), 
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
+                            .withProperty(FEDERATION_POLICY_NAME.toString(), 
"queue-policy")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress(getTestName() + "::" + 
getTestName())
+                                         .withCapabilities("queue")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationRemotePolicyControlType remotePolicyControl = 
(AMQPFederationRemotePolicyControlType)
+            server.getManagementService().getResource(policyResourceName);
+         AMQPFederationProducerControl producerControl = 
(AMQPFederationProducerControl)
+            server.getManagementService().getResource(producerResourceName);
+
+         assertNotNull(federationControl);
+         assertNotNull(remotePolicyControl);
+         assertNotNull(producerControl);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.close();
+
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(remoteBrokerConnectionName) == null, 
5_000, 50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(federationResourceName) == null, 
5_000, 50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(policyResourceName) == null, 5_000, 
50);
+         Wait.assertTrue(() -> 
server.getManagementService().getResource(producerResourceName) == null, 5_000, 
50);
+      }
+
+      // Test registers and cleans up on connection dropped
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         scriptFederationConnectToRemote(peer, serverNodeId, 
brokerConnectionName, getTestName());
+         peer.connect("localhost", AMQP_PORT);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationControl federationControl =
+            (AMQPFederationControl) 
server.getManagementService().getResource(federationResourceName);
+
+         assertNotNull(federationControl);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().withName(getTestName() + "::" + 
getTestName())
+                                       
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
+                                       .withSource().withAddress(getTestName() 
+ "::" + getTestName());
+
+         // Connect to remote as if an queue had demand and matched our 
federation policy
+         peer.remoteAttach().ofReceiver()
+                            
.withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
+                            .withName(getTestName() + "::" + getTestName())
+                            
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(), 
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
+                            .withProperty(FEDERATION_POLICY_NAME.toString(), 
"queue-policy")
+                            .withSenderSettleModeUnsettled()
+                            .withReceivervSettlesFirst()
+                            .withSource().withDurabilityOfNone()
+                                         .withExpiryPolicyOnLinkDetach()
+                                         .withAddress(getTestName() + "::" + 
getTestName())
+                                         .withCapabilities("queue")
+                                         .and()
+                            .withTarget().and()
+                            .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         final AMQPFederationRemotePolicyControlType remotePolicyControl = 
(AMQPFederationRemotePolicyControlType)
+            server.getManagementService().getResource(policyResourceName);
+         AMQPFederationProducerControl producerControl = 
(AMQPFederationProducerControl)
+            server.getManagementService().getResource(producerResourceName);

Review Comment:
   make both final?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org
For additional commands, e-mail: gitbox-h...@activemq.apache.org
For further information, visit: https://activemq.apache.org/contact



Reply via email to