gemmellr commented on code in PR #5478:
URL: https://github.com/apache/activemq-artemis/pull/5478#discussion_r1937135108
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java:
##########
@@ -137,51 +139,71 @@ protected void signalError(Exception cause) {
@Override
void registerFederationManagement() throws Exception {
- // Not yet implemented for the target side of the federation connection
+ if (brokerConnection.isManagable()) {
+
AMQPFederationManagementSupport.registerFederationTarget(brokerConnection.getNodeId(),
brokerConnection.getName(), this);
+ }
}
@Override
void unregisterFederationManagement() throws Exception {
- // Not yet implemented for the target side of the federation connection
+ if (brokerConnection.isManagable()) {
+
AMQPFederationManagementSupport.unregisterFederationTarget(brokerConnection.getNodeId(),
brokerConnection.getName(), this);
+ }
}
@Override
void registerLocalPolicyManagement(AMQPFederationLocalPolicyManager
manager) throws Exception {
- // Not yet implemented for the target side of the federation connection
+ if (brokerConnection.isManagable()) {
+
AMQPFederationManagementSupport.registerLocalPolicyOnTarget(brokerConnection.getNodeId(),
brokerConnection.getName(), manager);
+ }
}
@Override
void unregisterLocalPolicyManagement(AMQPFederationLocalPolicyManager
manager) throws Exception {
- // Not yet implemented for the target side of the federation connection
+ if (brokerConnection.isManagable()) {
+
AMQPFederationManagementSupport.unregisterLocalPolicyOnTarget(brokerConnection.getNodeId(),
brokerConnection.getName(), manager);
+ }
}
@Override
void registerRemotePolicyManagement(AMQPFederationRemotePolicyManager
manager) throws Exception {
- // Not yet implemented for the target side of the federation connection
+ if (brokerConnection.isManagable()) {
+
AMQPFederationManagementSupport.registerRemotePolicyOnTarget(brokerConnection.getNodeId(),
brokerConnection.getName(), manager);
+ }
}
@Override
void unregisterRemotePolicyManagement(AMQPFederationRemotePolicyManager
manager) throws Exception {
- // Not yet implemented for the target side of the federation connection
+ if (brokerConnection.isManagable()) {
+
AMQPFederationManagementSupport.unregisterRemotePolicyOnTarget(brokerConnection.getNodeId(),
brokerConnection.getName(), manager);
+ }
}
@Override
void registerFederationConsumerManagement(AMQPFederationConsumer consumer)
throws Exception {
- // Not yet implemented for the target side of the federation connection
+ if (brokerConnection.isManagable()) {
+
AMQPFederationManagementSupport.registerFederationTargetConsumer(brokerConnection.getNodeId(),
brokerConnection.getName(), consumer);
+ }
}
@Override
void unregisterFederationConsumerManagement(AMQPFederationConsumer
consumer) throws Exception {
- // Not yet implemented for the target side of the federation connection
+ if (brokerConnection.isManagable()) {
+
AMQPFederationManagementSupport.unregisterFederationTargetConsumer(brokerConnection.getNodeId(),
brokerConnection.getName(), consumer);
+ }
}
@Override
void registerFederationProducerManagement(AMQPFederationSenderController
sender) throws Exception {
- // Not yet implemented for the target side of the federation connection
+ if (brokerConnection.isManagable()) {
+
AMQPFederationManagementSupport.registerFederationTargetProducer(brokerConnection.getNodeId(),
brokerConnection.getName(), sender);
+ }
}
@Override
void unregisterFederationProdcerManagement(AMQPFederationSenderController
sender) throws Exception {
- // Not yet implemented for the target side of the federation connection
+ if (brokerConnection.isManagable()) {
Review Comment:
Not the change itself, but the method its in: typo Prodcer -> Producer.
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java:
##########
@@ -1695,4 +1710,305 @@ public void
testRemoteQueueFederationTrackingCleanedUpOnBrokerConnectionStopped(
peer.close();
}
}
+
+ @Test
+ @Timeout(20)
+ public void
testRemoteBrokerRegistersAndRemovesRemoteAddressFederationBrokerConnectionInManagement()
throws Exception {
+ server.start();
+ server.addAddressInfo(new
AddressInfo(getTestName()).addRoutingType(RoutingType.MULTICAST));
+
+ final String serverNodeId = server.getNodeID().toString();
+ final String brokerConnectionName = getTestName();
+ final String remoteBrokerConnectionName =
ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." +
getTestName();
+ final String federationResourceName =
+
AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId,
brokerConnectionName, getTestName());
+ final String policyResourceName =
AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId,
brokerConnectionName, getTestName(), "address-policy");
+ final String producerResourceName =
AMQPFederationManagementSupport.getFederationTargetAddressProducerResourceName(serverNodeId,
brokerConnectionName, getTestName(), "address-policy", getTestName());
+
+ // Test registers and cleans up on connection closed by remote
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, serverNodeId,
brokerConnectionName, getTestName());
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final RemoteBrokerConnectionControl remoteBrokerConnection =
(RemoteBrokerConnectionControl)
+
server.getManagementService().getResource(remoteBrokerConnectionName);
+
+ assertNotNull(remoteBrokerConnection);
+ assertEquals(getTestName(), remoteBrokerConnection.getName());
+ assertEquals(server.getNodeID().toString(),
remoteBrokerConnection.getNodeId());
+
+ final AMQPFederationControl federationControl =
+ (AMQPFederationControl)
server.getManagementService().getResource(federationResourceName);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName(getTestName())
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withSource().withAddress(getTestName());
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(getTestName())
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress(getTestName())
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationRemotePolicyControlType remotePolicyControl =
(AMQPFederationRemotePolicyControlType)
+ server.getManagementService().getResource(policyResourceName);
+ AMQPFederationProducerControl producerControl =
(AMQPFederationProducerControl)
+ server.getManagementService().getResource(producerResourceName);
+
+ assertNotNull(federationControl);
+ assertNotNull(remotePolicyControl);
+ assertNotNull(producerControl);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ Wait.assertTrue(() ->
server.getManagementService().getResource(remoteBrokerConnectionName) == null,
5_000, 50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(federationResourceName) == null,
5_000, 50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(policyResourceName) == null, 5_000,
50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(producerResourceName) == null, 5_000,
50);
+ }
+
+ // Test registers and cleans up on connection dropped
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, serverNodeId,
brokerConnectionName, getTestName());
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationControl federationControl =
+ (AMQPFederationControl)
server.getManagementService().getResource(federationResourceName);
+
+ assertNotNull(federationControl);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName(getTestName())
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withSource().withAddress(getTestName());
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(getTestName())
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress(getTestName())
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationRemotePolicyControlType remotePolicyControl =
(AMQPFederationRemotePolicyControlType)
+ server.getManagementService().getResource(policyResourceName);
+ AMQPFederationProducerControl producerControl =
(AMQPFederationProducerControl)
+ server.getManagementService().getResource(producerResourceName);
+
+ assertNotNull(federationControl);
+ assertNotNull(remotePolicyControl);
+ assertNotNull(producerControl);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ Wait.assertTrue(() ->
server.getManagementService().getResource(remoteBrokerConnectionName) == null,
5_000, 50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(federationResourceName) == null,
5_000, 50);
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testRemoteBrokerRegistersAndRemovesRemoteQueueFederationBrokerConnectionInManagement()
throws Exception {
+ server.start();
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ final String serverNodeId = server.getNodeID().toString();
+ final String brokerConnectionName = getTestName();
+ final String remoteBrokerConnectionName =
ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." +
getTestName();
+ final String federationResourceName =
+
AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId,
brokerConnectionName, getTestName());
+ final String policyResourceName =
AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId,
brokerConnectionName, getTestName(), "queue-policy");
+ final String producerResourceName =
AMQPFederationManagementSupport.getFederationTargetQueueProducerResourceName(serverNodeId,
brokerConnectionName, getTestName(), "queue-policy", getTestName() + "::" +
getTestName());
+
+ // Test registers and cleans up on connection closed by remote
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, serverNodeId,
brokerConnectionName, getTestName());
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final RemoteBrokerConnectionControl remoteBrokerConnection =
(RemoteBrokerConnectionControl)
+
server.getManagementService().getResource(remoteBrokerConnectionName);
+
+ assertNotNull(remoteBrokerConnection);
+ assertEquals(getTestName(), remoteBrokerConnection.getName());
+ assertEquals(server.getNodeID().toString(),
remoteBrokerConnection.getNodeId());
+
+ final AMQPFederationControl federationControl =
+ (AMQPFederationControl)
server.getManagementService().getResource(federationResourceName);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName(getTestName() + "::" +
getTestName())
+
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
+ .withSource().withAddress(getTestName()
+ "::" + getTestName());
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
+ .withName(getTestName() + "::" + getTestName())
+
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(),
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"queue-policy")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress(getTestName() + "::" +
getTestName())
+ .withCapabilities("queue")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationRemotePolicyControlType remotePolicyControl =
(AMQPFederationRemotePolicyControlType)
+ server.getManagementService().getResource(policyResourceName);
+ AMQPFederationProducerControl producerControl =
(AMQPFederationProducerControl)
+ server.getManagementService().getResource(producerResourceName);
Review Comment:
make both final?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationManagementSupport.java:
##########
@@ -141,10 +172,58 @@ public static ObjectName
getFederationSourceObjectName(ManagementService managem
ObjectName.quote(federationName)));
}
+ /**
+ * Register the given {@link AMQPFederationTarget} instance with the broker
management services.
+ *
+ * @param federation
+ * The federation target instance being registered with management.
+ *
+ * @throws Exception if an error occurs while registering the federation
with the management services.
+ */
+ public static void registerFederationTarget(String remoteNodeId, String
brokerConnectionName, AMQPFederationTarget federation) throws Exception {
+ final String federationName = federation.getName();
+ final ActiveMQServer server = federation.getServer();
+ final ManagementService management = server.getManagementService();
+ final AMQPFederationTargetControlType control = new
AMQPFederationTargetControlType(server, federation);
+
+ management.registerInJMX(getFederationTargetObjectName(management,
remoteNodeId, brokerConnectionName, federationName), control);
+
management.registerInRegistry(getFederationTargetResourceName(remoteNodeId,
brokerConnectionName, federationName), control);
+ }
+
+ /**
+ * Unregister the given {@link AMQPFederationTarget} instance with the
broker management services.
+ *
+ * @param federation
+ * The federation target instance being unregistered from management.
+ *
+ * @throws Exception if an error occurs while unregistering the federation
with the management services.
+ */
+ public static void unregisterFederationTarget(String remoteNodeId, String
brokerConnectionName, AMQPFederationTarget federation) throws Exception {
+ final String federationName = federation.getName();
+ final ActiveMQServer server = federation.getServer();
+ final ManagementService management = server.getManagementService();
+
+ management.unregisterFromJMX(getFederationTargetObjectName(management,
remoteNodeId, brokerConnectionName, federationName));
+
management.unregisterFromRegistry(getFederationTargetResourceName(remoteNodeId,
brokerConnectionName, federationName));
+ }
+
+ public static String getFederationTargetResourceName(String remoteNodeId,
String brokerConnectionName, String federationName) {
+ return String.format(FEDERATION_TARGET_RESOURCE_TEMPLATE, remoteNodeId,
brokerConnectionName, federationName);
+ }
+
+ public static ObjectName getFederationTargetObjectName(ManagementService
management, String remoteNodeId, String brokerConnection, String
federationName) throws Exception {
+ final String brokerConnectionName =
management.getObjectNameBuilder().getRemoteBrokerConnectionBaseObjectNameString(remoteNodeId,
brokerConnection);
Review Comment:
Bit of a nitpick, but 'brokerConnectionName' keeps making me think its just
the connection name, when its simply not...maybe make it
'brokerConnectionObjectName' for clarity ?
(same in the several other places
getRemoteBrokerConnectionBaseObjectNameString is called)
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java:
##########
@@ -1695,4 +1710,305 @@ public void
testRemoteQueueFederationTrackingCleanedUpOnBrokerConnectionStopped(
peer.close();
}
}
+
+ @Test
+ @Timeout(20)
+ public void
testRemoteBrokerRegistersAndRemovesRemoteAddressFederationBrokerConnectionInManagement()
throws Exception {
+ server.start();
+ server.addAddressInfo(new
AddressInfo(getTestName()).addRoutingType(RoutingType.MULTICAST));
+
+ final String serverNodeId = server.getNodeID().toString();
+ final String brokerConnectionName = getTestName();
+ final String remoteBrokerConnectionName =
ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." +
getTestName();
+ final String federationResourceName =
+
AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId,
brokerConnectionName, getTestName());
+ final String policyResourceName =
AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId,
brokerConnectionName, getTestName(), "address-policy");
+ final String producerResourceName =
AMQPFederationManagementSupport.getFederationTargetAddressProducerResourceName(serverNodeId,
brokerConnectionName, getTestName(), "address-policy", getTestName());
+
+ // Test registers and cleans up on connection closed by remote
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, serverNodeId,
brokerConnectionName, getTestName());
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final RemoteBrokerConnectionControl remoteBrokerConnection =
(RemoteBrokerConnectionControl)
+
server.getManagementService().getResource(remoteBrokerConnectionName);
+
+ assertNotNull(remoteBrokerConnection);
+ assertEquals(getTestName(), remoteBrokerConnection.getName());
+ assertEquals(server.getNodeID().toString(),
remoteBrokerConnection.getNodeId());
+
+ final AMQPFederationControl federationControl =
+ (AMQPFederationControl)
server.getManagementService().getResource(federationResourceName);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName(getTestName())
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withSource().withAddress(getTestName());
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(getTestName())
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress(getTestName())
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationRemotePolicyControlType remotePolicyControl =
(AMQPFederationRemotePolicyControlType)
+ server.getManagementService().getResource(policyResourceName);
+ AMQPFederationProducerControl producerControl =
(AMQPFederationProducerControl)
Review Comment:
make both final?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java:
##########
@@ -1695,4 +1710,305 @@ public void
testRemoteQueueFederationTrackingCleanedUpOnBrokerConnectionStopped(
peer.close();
}
}
+
+ @Test
+ @Timeout(20)
+ public void
testRemoteBrokerRegistersAndRemovesRemoteAddressFederationBrokerConnectionInManagement()
throws Exception {
+ server.start();
+ server.addAddressInfo(new
AddressInfo(getTestName()).addRoutingType(RoutingType.MULTICAST));
+
+ final String serverNodeId = server.getNodeID().toString();
+ final String brokerConnectionName = getTestName();
+ final String remoteBrokerConnectionName =
ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." +
getTestName();
+ final String federationResourceName =
+
AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId,
brokerConnectionName, getTestName());
+ final String policyResourceName =
AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId,
brokerConnectionName, getTestName(), "address-policy");
+ final String producerResourceName =
AMQPFederationManagementSupport.getFederationTargetAddressProducerResourceName(serverNodeId,
brokerConnectionName, getTestName(), "address-policy", getTestName());
+
+ // Test registers and cleans up on connection closed by remote
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, serverNodeId,
brokerConnectionName, getTestName());
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final RemoteBrokerConnectionControl remoteBrokerConnection =
(RemoteBrokerConnectionControl)
+
server.getManagementService().getResource(remoteBrokerConnectionName);
+
+ assertNotNull(remoteBrokerConnection);
+ assertEquals(getTestName(), remoteBrokerConnection.getName());
+ assertEquals(server.getNodeID().toString(),
remoteBrokerConnection.getNodeId());
+
+ final AMQPFederationControl federationControl =
+ (AMQPFederationControl)
server.getManagementService().getResource(federationResourceName);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName(getTestName())
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withSource().withAddress(getTestName());
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(getTestName())
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress(getTestName())
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationRemotePolicyControlType remotePolicyControl =
(AMQPFederationRemotePolicyControlType)
+ server.getManagementService().getResource(policyResourceName);
+ AMQPFederationProducerControl producerControl =
(AMQPFederationProducerControl)
+ server.getManagementService().getResource(producerResourceName);
+
+ assertNotNull(federationControl);
+ assertNotNull(remotePolicyControl);
+ assertNotNull(producerControl);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ Wait.assertTrue(() ->
server.getManagementService().getResource(remoteBrokerConnectionName) == null,
5_000, 50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(federationResourceName) == null,
5_000, 50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(policyResourceName) == null, 5_000,
50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(producerResourceName) == null, 5_000,
50);
+ }
+
+ // Test registers and cleans up on connection dropped
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, serverNodeId,
brokerConnectionName, getTestName());
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationControl federationControl =
+ (AMQPFederationControl)
server.getManagementService().getResource(federationResourceName);
+
+ assertNotNull(federationControl);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName(getTestName())
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withSource().withAddress(getTestName());
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(getTestName())
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress(getTestName())
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationRemotePolicyControlType remotePolicyControl =
(AMQPFederationRemotePolicyControlType)
+ server.getManagementService().getResource(policyResourceName);
+ AMQPFederationProducerControl producerControl =
(AMQPFederationProducerControl)
+ server.getManagementService().getResource(producerResourceName);
Review Comment:
make both final?
##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationManagementTest.java:
##########
@@ -1695,4 +1710,305 @@ public void
testRemoteQueueFederationTrackingCleanedUpOnBrokerConnectionStopped(
peer.close();
}
}
+
+ @Test
+ @Timeout(20)
+ public void
testRemoteBrokerRegistersAndRemovesRemoteAddressFederationBrokerConnectionInManagement()
throws Exception {
+ server.start();
+ server.addAddressInfo(new
AddressInfo(getTestName()).addRoutingType(RoutingType.MULTICAST));
+
+ final String serverNodeId = server.getNodeID().toString();
+ final String brokerConnectionName = getTestName();
+ final String remoteBrokerConnectionName =
ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." +
getTestName();
+ final String federationResourceName =
+
AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId,
brokerConnectionName, getTestName());
+ final String policyResourceName =
AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId,
brokerConnectionName, getTestName(), "address-policy");
+ final String producerResourceName =
AMQPFederationManagementSupport.getFederationTargetAddressProducerResourceName(serverNodeId,
brokerConnectionName, getTestName(), "address-policy", getTestName());
+
+ // Test registers and cleans up on connection closed by remote
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, serverNodeId,
brokerConnectionName, getTestName());
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final RemoteBrokerConnectionControl remoteBrokerConnection =
(RemoteBrokerConnectionControl)
+
server.getManagementService().getResource(remoteBrokerConnectionName);
+
+ assertNotNull(remoteBrokerConnection);
+ assertEquals(getTestName(), remoteBrokerConnection.getName());
+ assertEquals(server.getNodeID().toString(),
remoteBrokerConnection.getNodeId());
+
+ final AMQPFederationControl federationControl =
+ (AMQPFederationControl)
server.getManagementService().getResource(federationResourceName);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName(getTestName())
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withSource().withAddress(getTestName());
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(getTestName())
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress(getTestName())
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationRemotePolicyControlType remotePolicyControl =
(AMQPFederationRemotePolicyControlType)
+ server.getManagementService().getResource(policyResourceName);
+ AMQPFederationProducerControl producerControl =
(AMQPFederationProducerControl)
+ server.getManagementService().getResource(producerResourceName);
+
+ assertNotNull(federationControl);
+ assertNotNull(remotePolicyControl);
+ assertNotNull(producerControl);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ Wait.assertTrue(() ->
server.getManagementService().getResource(remoteBrokerConnectionName) == null,
5_000, 50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(federationResourceName) == null,
5_000, 50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(policyResourceName) == null, 5_000,
50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(producerResourceName) == null, 5_000,
50);
+ }
+
+ // Test registers and cleans up on connection dropped
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, serverNodeId,
brokerConnectionName, getTestName());
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationControl federationControl =
+ (AMQPFederationControl)
server.getManagementService().getResource(federationResourceName);
+
+ assertNotNull(federationControl);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName(getTestName())
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withSource().withAddress(getTestName());
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_ADDRESS_RECEIVER.toString())
+ .withName(getTestName())
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"address-policy")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress(getTestName())
+ .withCapabilities("topic")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationRemotePolicyControlType remotePolicyControl =
(AMQPFederationRemotePolicyControlType)
+ server.getManagementService().getResource(policyResourceName);
+ AMQPFederationProducerControl producerControl =
(AMQPFederationProducerControl)
+ server.getManagementService().getResource(producerResourceName);
+
+ assertNotNull(federationControl);
+ assertNotNull(remotePolicyControl);
+ assertNotNull(producerControl);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ Wait.assertTrue(() ->
server.getManagementService().getResource(remoteBrokerConnectionName) == null,
5_000, 50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(federationResourceName) == null,
5_000, 50);
+ }
+ }
+
+ @Test
+ @Timeout(20)
+ public void
testRemoteBrokerRegistersAndRemovesRemoteQueueFederationBrokerConnectionInManagement()
throws Exception {
+ server.start();
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ final String serverNodeId = server.getNodeID().toString();
+ final String brokerConnectionName = getTestName();
+ final String remoteBrokerConnectionName =
ResourceNames.REMOTE_BROKER_CONNECTION + server.getNodeID() + "." +
getTestName();
+ final String federationResourceName =
+
AMQPFederationManagementSupport.getFederationTargetResourceName(serverNodeId,
brokerConnectionName, getTestName());
+ final String policyResourceName =
AMQPFederationManagementSupport.getFederationTargetPolicyResourceName(serverNodeId,
brokerConnectionName, getTestName(), "queue-policy");
+ final String producerResourceName =
AMQPFederationManagementSupport.getFederationTargetQueueProducerResourceName(serverNodeId,
brokerConnectionName, getTestName(), "queue-policy", getTestName() + "::" +
getTestName());
+
+ // Test registers and cleans up on connection closed by remote
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, serverNodeId,
brokerConnectionName, getTestName());
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final RemoteBrokerConnectionControl remoteBrokerConnection =
(RemoteBrokerConnectionControl)
+
server.getManagementService().getResource(remoteBrokerConnectionName);
+
+ assertNotNull(remoteBrokerConnection);
+ assertEquals(getTestName(), remoteBrokerConnection.getName());
+ assertEquals(server.getNodeID().toString(),
remoteBrokerConnection.getNodeId());
+
+ final AMQPFederationControl federationControl =
+ (AMQPFederationControl)
server.getManagementService().getResource(federationResourceName);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName(getTestName() + "::" +
getTestName())
+
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
+ .withSource().withAddress(getTestName()
+ "::" + getTestName());
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
+ .withName(getTestName() + "::" + getTestName())
+
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(),
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"queue-policy")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress(getTestName() + "::" +
getTestName())
+ .withCapabilities("queue")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationRemotePolicyControlType remotePolicyControl =
(AMQPFederationRemotePolicyControlType)
+ server.getManagementService().getResource(policyResourceName);
+ AMQPFederationProducerControl producerControl =
(AMQPFederationProducerControl)
+ server.getManagementService().getResource(producerResourceName);
+
+ assertNotNull(federationControl);
+ assertNotNull(remotePolicyControl);
+ assertNotNull(producerControl);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+
+ Wait.assertTrue(() ->
server.getManagementService().getResource(remoteBrokerConnectionName) == null,
5_000, 50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(federationResourceName) == null,
5_000, 50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(policyResourceName) == null, 5_000,
50);
+ Wait.assertTrue(() ->
server.getManagementService().getResource(producerResourceName) == null, 5_000,
50);
+ }
+
+ // Test registers and cleans up on connection dropped
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ scriptFederationConnectToRemote(peer, serverNodeId,
brokerConnectionName, getTestName());
+ peer.connect("localhost", AMQP_PORT);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationControl federationControl =
+ (AMQPFederationControl)
server.getManagementService().getResource(federationResourceName);
+
+ assertNotNull(federationControl);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().withName(getTestName() + "::" +
getTestName())
+
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
+ .withSource().withAddress(getTestName()
+ "::" + getTestName());
+
+ // Connect to remote as if an queue had demand and matched our
federation policy
+ peer.remoteAttach().ofReceiver()
+
.withDesiredCapabilities(FEDERATION_QUEUE_RECEIVER.toString())
+ .withName(getTestName() + "::" + getTestName())
+
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(),
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
+ .withProperty(FEDERATION_POLICY_NAME.toString(),
"queue-policy")
+ .withSenderSettleModeUnsettled()
+ .withReceivervSettlesFirst()
+ .withSource().withDurabilityOfNone()
+ .withExpiryPolicyOnLinkDetach()
+ .withAddress(getTestName() + "::" +
getTestName())
+ .withCapabilities("queue")
+ .and()
+ .withTarget().and()
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final AMQPFederationRemotePolicyControlType remotePolicyControl =
(AMQPFederationRemotePolicyControlType)
+ server.getManagementService().getResource(policyResourceName);
+ AMQPFederationProducerControl producerControl =
(AMQPFederationProducerControl)
+ server.getManagementService().getResource(producerResourceName);
Review Comment:
make both final?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact