This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 4ea55cd611 ARTEMIS-4568 Configuration reload for AMQP federation
broker connections
4ea55cd611 is described below
commit 4ea55cd611fde8664104158b42690958809a2716
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Jan 16 15:03:34 2024 -0500
ARTEMIS-4568 Configuration reload for AMQP federation broker connections
Allows the configuration of AMQP Federation broker connections to be
updated and
reloaded. This allows for update, add or remove of AMQP federation broker
connections
as well as the basic AMQP sender and receiver broker connections. It checks
for and
ignores changes in AMQP broker connections that are performing Mirroring as
that
would lead to issues that can break mirroring.
---
.../amqp/broker/ProtonProtocolManagerFactory.java | 57 +-
.../amqp/connect/AMQPBrokerConnection.java | 8 +-
.../amqp/connect/AMQPBrokerConnectionManager.java | 178 ++++-
.../artemis/core/config/Configuration.java | 18 +
.../AMQPBrokerConnectConfiguration.java | 35 +-
.../AMQPBrokerConnectionElement.java | 32 +-
.../AMQPFederatedBrokerConnectionElement.java | 33 +
.../AMQPMirrorBrokerConnectionElement.java | 38 +
.../BrokerConnectConfiguration.java | 48 +-
.../core/config/impl/ConfigurationImpl.java | 13 +
.../core/remoting/server/RemotingService.java | 14 +
.../remoting/server/impl/RemotingServiceImpl.java | 7 +
.../artemis/core/server/ActiveMQServer.java | 10 +
.../artemis/core/server/BrokerConnection.java | 15 +
.../core/server/impl/ActiveMQServerImpl.java | 18 +-
.../protocol/AbstractProtocolManagerFactory.java | 4 +
.../spi/core/protocol/ProtocolManagerFactory.java | 15 +
.../AMQPBrokerConnectConfigurationTest.java | 149 ++++
.../AMQPBrokerConnectionElementTest.java | 104 +++
.../AMQPFederatedBrokerConnectionElementTest.java | 113 +++
.../AMQPMirrorBrokerConnectionElementTest.java | 126 ++++
pom.xml | 2 +-
.../AMQPFederationConfigurationReloadTest.java | 840 +++++++++++++++++++++
.../amqp/connect/AMQPFederationConnectTest.java | 1 +
.../connect/AMQPFederationQueuePolicyTest.java | 1 +
.../amqp/connect/AMQPMirrorConnectionTest.java | 166 ++++
.../management/ActiveMQServerControlTest.java | 6 +
.../reload-amqp-federated-addresses-reload.xml | 88 +++
.../resources/reload-amqp-federated-addresses.xml | 87 +++
.../test/resources/reload-amqp-federated-basic.xml | 77 ++
.../reload-amqp-federated-queues-reload.properties | 28 +
.../reload-amqp-federated-queues-reload.xml | 88 +++
.../reload-amqp-federated-queues.properties | 26 +
.../resources/reload-amqp-federated-queues.xml | 87 +++
34 files changed, 2489 insertions(+), 43 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
index e0b6a2b9e1..fa8fc0414b 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.List;
import java.util.Map;
-
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Message;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
@@ -41,6 +40,8 @@ public class ProtonProtocolManagerFactory extends
AbstractProtocolManagerFactory
private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME};
+ private AMQPBrokerConnectionManager brokerConnectionManager;
+
@Override
public Persister<Message>[] getPersister() {
@@ -72,14 +73,56 @@ public class ProtonProtocolManagerFactory extends
AbstractProtocolManagerFactory
return MODULE_NAME;
}
- /** AMQP integration with the broker on this case needs to be soft.
- * As the broker may choose to not load the AMQP Protocol */
+ /**
+ * AMQP integration with the broker on this case needs to be soft as the
+ * broker may choose to not load the AMQP Protocol module.
+ */
@Override
public void loadProtocolServices(ActiveMQServer server,
List<ActiveMQComponent> services) {
- List<AMQPBrokerConnectConfiguration> amqpServicesConfiguration =
server.getConfiguration().getAMQPConnection();
- if (amqpServicesConfiguration != null &&
amqpServicesConfiguration.size() > 0) {
- AMQPBrokerConnectionManager bridgeService = new
AMQPBrokerConnectionManager(this, amqpServicesConfiguration, server);
- services.add(bridgeService);
+ final List<AMQPBrokerConnectConfiguration> amqpServicesConfigurations =
server.getConfiguration().getAMQPConnection();
+
+ if (amqpServicesConfigurations != null &&
amqpServicesConfigurations.size() > 0) {
+ brokerConnectionManager = new AMQPBrokerConnectionManager(this,
amqpServicesConfigurations, server);
+ services.add(brokerConnectionManager);
+ }
+ }
+
+ /*
+ * Check if broker configuration of AMQP broker connections or other broker
+ * configuration related to protocol services has been updated and update
the
+ * protocol services accordingly.
+ */
+ @Override
+ public void updateProtocolServices(ActiveMQServer server,
List<ActiveMQComponent> services) throws Exception {
+ if (brokerConnectionManager == null) {
+ checkAddNewBrokerConnectionManager(server, services);
+ } else {
+ updateBrokerConnectionManager(server, services);
+ }
+ }
+
+ private void updateBrokerConnectionManager(ActiveMQServer server,
List<ActiveMQComponent> services) throws Exception {
+ final List<AMQPBrokerConnectConfiguration> amqpServicesConfigurations =
server.getConfiguration().getAMQPConnection();
+
+ brokerConnectionManager.updateConfiguration(amqpServicesConfigurations);
+
+ if (brokerConnectionManager.getConfiguredConnectionsCount() == 0) {
+ try {
+ brokerConnectionManager.stop();
+ } finally {
+ services.remove(brokerConnectionManager);
+ brokerConnectionManager = null;
+ }
+ }
+ }
+
+ private void checkAddNewBrokerConnectionManager(ActiveMQServer server,
List<ActiveMQComponent> services) throws Exception {
+ final List<AMQPBrokerConnectConfiguration> amqpServicesConfigurations =
server.getConfiguration().getAMQPConnection();
+
+ if (amqpServicesConfigurations != null &&
!amqpServicesConfigurations.isEmpty()) {
+ brokerConnectionManager = new AMQPBrokerConnectionManager(this,
amqpServicesConfigurations, server);
+ services.add(brokerConnectionManager);
+ brokerConnectionManager.start();
}
}
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 246136b0b0..97a6287f26 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -151,7 +151,8 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
* the actual connection will come from the amqpConnection configuration*/
int port;
- public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager,
AMQPBrokerConnectConfiguration brokerConnectConfiguration,
+ public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager,
+ AMQPBrokerConnectConfiguration
brokerConnectConfiguration,
ProtonProtocolManager protonProtocolManager,
ActiveMQServer server,
NettyConnector bridgesConnector) {
@@ -174,6 +175,11 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
return "AMQP";
}
+ @Override
+ public AMQPBrokerConnectConfiguration getConfiguration() {
+ return brokerConnectConfiguration;
+ }
+
@Override
public boolean isStarted() {
return started;
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
index 4bca1772bb..b0fde0ce7f 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
@@ -16,16 +16,22 @@
*/
package org.apache.activemq.artemis.protocol.amqp.connect;
-import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import io.netty.channel.ChannelPipeline;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import
org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
@@ -41,6 +47,7 @@ import
org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.lang.invoke.MethodHandles;
public class AMQPBrokerConnectionManager implements ActiveMQComponent,
ClientConnectionLifeCycleListener {
@@ -50,39 +57,160 @@ public class AMQPBrokerConnectionManager implements
ActiveMQComponent, ClientCon
private final ActiveMQServer server;
private volatile boolean started = false;
- private List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig;
- private List<AMQPBrokerConnection> amqpBrokerConnectionList;
+ private final Map<String, AMQPBrokerConnectConfiguration>
amqpConnectionsConfig;
+ private final Map<String, AMQPBrokerConnection> amqpBrokerConnections = new
HashMap<>();
+
private ProtonProtocolManager protonProtocolManager;
public AMQPBrokerConnectionManager(ProtonProtocolManagerFactory factory,
List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig, ActiveMQServer
server) {
- this.amqpConnectionsConfig = amqpConnectionsConfig;
+ this.amqpConnectionsConfig =
+ amqpConnectionsConfig.stream()
+ .collect(Collectors.toMap(c -> c.getName(),
Function.identity()));
+
this.server = server;
this.protonProtocolManagerFactory = factory;
}
+ public ProtonProtocolManagerFactory getProtocolManagerFactory() {
+ return protonProtocolManagerFactory;
+ }
+
@Override
public void start() throws Exception {
if (!started) {
started = true;
+
+ for (AMQPBrokerConnectConfiguration configuration :
amqpConnectionsConfig.values()) {
+ createBrokerConnection(configuration, configuration.isAutostart());
+ }
}
+ }
+
+ /**
+ * @return the number of configured broker connection configurations.
+ */
+ public int getConfiguredConnectionsCount() {
+ return amqpConnectionsConfig.size();
+ }
- amqpBrokerConnectionList = new ArrayList<>();
+ private void createBrokerConnection(AMQPBrokerConnectConfiguration
configuration, boolean start) throws Exception {
+ NettyConnectorFactory factory = new
NettyConnectorFactory().setServerConnector(true);
+ protonProtocolManager =
(ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server,
configuration.getTransportConfigurations().get(0).getExtraParams(), null,
null);
+ NettyConnector bridgesConnector =
(NettyConnector)factory.createConnector(configuration.getTransportConfigurations().get(0).getParams(),
null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(),
server.getScheduledPool(), new
ClientProtocolManagerWithAMQP(protonProtocolManager));
+ bridgesConnector.start();
- for (AMQPBrokerConnectConfiguration config : amqpConnectionsConfig) {
- NettyConnectorFactory factory = new
NettyConnectorFactory().setServerConnector(true);
- protonProtocolManager =
(ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server,
config.getTransportConfigurations().get(0).getExtraParams(), null, null);
- NettyConnector bridgesConnector =
(NettyConnector)factory.createConnector(config.getTransportConfigurations().get(0).getParams(),
null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(),
server.getScheduledPool(), new
ClientProtocolManagerWithAMQP(protonProtocolManager));
- bridgesConnector.start();
+ logger.debug("Connecting {}", configuration);
- logger.debug("Connecting {}", config);
+ AMQPBrokerConnection amqpBrokerConnection = new
AMQPBrokerConnection(this, configuration, protonProtocolManager, server,
bridgesConnector);
+ amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection);
+ server.registerBrokerConnection(amqpBrokerConnection);
+
+ if (start) {
+ amqpBrokerConnection.start();
+ }
+ }
- AMQPBrokerConnection amqpBrokerConnection = new
AMQPBrokerConnection(this, config, protonProtocolManager, server,
bridgesConnector);
- amqpBrokerConnectionList.add(amqpBrokerConnection);
- server.registerBrokerConnection(amqpBrokerConnection);
- if (config.isAutostart()) {
- amqpBrokerConnection.start();
+ /**
+ * Updates the configuration of any / all broker connections in the broker
connection manager
+ * based on updated broker configuration.
+ *
+ * @param configurations
+ * A list of broker connection configurations after a broker
configuration update.
+ *
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ public void updateConfiguration(List<AMQPBrokerConnectConfiguration>
configurations) throws Exception {
+ final List<AMQPBrokerConnectConfiguration> updatedConfigurations =
+ configurations != null ? configurations : Collections.EMPTY_LIST;
+
+ // Find any updated configurations and stop / and recreate as needed.
+ for (AMQPBrokerConnectConfiguration configuration :
updatedConfigurations) {
+ final AMQPBrokerConnectConfiguration previous =
amqpConnectionsConfig.put(configuration.getName(), configuration);
+
+ if (previous == null || !configuration.equals(previous)) {
+ // We don't currently allow updating broker connections with
mirror configurations
+ // as that could break the mirroring or leak resources until
properly implemented.
+ // This means that a mirror configuration and a federation
configuration on the
+ // same broker connection cannot update the federation portion.
+ //
+ // This does allow mirroring to be added to an existing broker
connection configuration
+ // once added though, the broker connection cannot be updated or
removed without a full
+ // restart of the broker.
+ if (previous != null && containsMirrorConfiguration(previous)) {
+ logger.info("Skipping update of broker connection {} which
contains a mirror " +
+ "configuration which are not reloadable.",
previous.getName());
+ continue;
+ }
+
+ // If this was an update and the connection is active meaning the
manager is
+ // started then we need to stop the old one if it exists before
attempting to
+ // start a new version with the new configuration.
+ final AMQPBrokerConnection connection =
amqpBrokerConnections.remove(configuration.getName());
+ // Defer to previous started state as we want to restore that if
it was already
+ // started and the configuration was updated.
+ final boolean autoStart = connection == null ?
+ configuration.isAutostart() : connection.isStarted() ||
configuration.isAutostart();
+
+ if (connection != null) {
+ try {
+ connection.stop();
+ } finally {
+ server.unregisterBrokerConnection(connection);
+ }
+ }
+
+ if (started) {
+ createBrokerConnection(configuration, autoStart);
+ }
}
}
+
+ // Find any removed configurations and remove them from the current set
+
+ final Map<String, AMQPBrokerConnectConfiguration> brokerConfigurations =
+ updatedConfigurations.stream()
+ .collect(Collectors.toMap(c -> c.getName(),
Function.identity()));
+
+ final List<AMQPBrokerConnectConfiguration> removedList =
amqpConnectionsConfig.values()
+ .stream()
+ .filter(c -> !brokerConfigurations.containsKey(c.getName()))
+ .collect(Collectors.toList());
+
+ for (AMQPBrokerConnectConfiguration toRemove : removedList) {
+ // We don't allow removal of broker connections with Mirror elements
as that leaves
+ // behind mirror controllers in the broker service that no longer
have a target that
+ // will ever be recovered. We could do work to remove the mirror
controller source but
+ // that code is not thread safe and would require more work which is
likely still ripe
+ // with issues.
+ if (containsMirrorConfiguration(toRemove)) {
+ logger.info("Skipping remove of broker connection {} which
contains a mirror " +
+ "configuration which are not reloadable.",
toRemove.getName());
+ continue;
+ }
+
+ amqpConnectionsConfig.remove(toRemove.getName());
+
+ final AMQPBrokerConnection connection =
amqpBrokerConnections.remove(toRemove.getName());
+
+ if (connection != null) {
+ try {
+ connection.stop();
+ } finally {
+ server.unregisterBrokerConnection(connection);
+ }
+ }
+ }
+ }
+
+ private boolean containsMirrorConfiguration(AMQPBrokerConnectConfiguration
configuration) {
+ for (AMQPBrokerConnectionElement element :
configuration.getConnectionElements()) {
+ if (AMQPBrokerConnectionAddressType.MIRROR.equals(element.getType()))
{
+ return true;
+ }
+ }
+
+ return false;
}
public void connected(NettyConnection nettyConnection, AMQPBrokerConnection
bridgeConnection) {
@@ -92,8 +220,16 @@ public class AMQPBrokerConnectionManager implements
ActiveMQComponent, ClientCon
public void stop() throws Exception {
if (started) {
started = false;
- for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
- connection.stop();
+ try {
+ for (AMQPBrokerConnection connection :
amqpBrokerConnections.values()) {
+ try {
+ connection.stop();
+ } finally {
+ server.unregisterBrokerConnection(connection);
+ }
+ }
+ } finally {
+ amqpBrokerConnections.clear();
}
}
}
@@ -109,7 +245,7 @@ public class AMQPBrokerConnectionManager implements
ActiveMQComponent, ClientCon
@Override
public void connectionDestroyed(Object connectionID, boolean failed) {
- for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
+ for (AMQPBrokerConnection connection : amqpBrokerConnections.values()) {
if (connection.getConnection() != null &&
connectionID.equals(connection.getConnection().getID())) {
connection.connectionDestroyed(connectionID, failed);
}
@@ -118,7 +254,7 @@ public class AMQPBrokerConnectionManager implements
ActiveMQComponent, ClientCon
@Override
public void connectionException(Object connectionID, ActiveMQException me) {
- for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
+ for (AMQPBrokerConnection connection : amqpBrokerConnections.values()) {
if (connection.getConnection() != null &&
connectionID.equals(connection.getConnection().getID())) {
connection.connectionException(connectionID, me);
}
@@ -127,7 +263,7 @@ public class AMQPBrokerConnectionManager implements
ActiveMQComponent, ClientCon
@Override
public void connectionReadyForWrites(Object connectionID, boolean ready) {
- for (AMQPBrokerConnection connection : amqpBrokerConnectionList) {
+ for (AMQPBrokerConnection connection : amqpBrokerConnections.values()) {
if (connection.getConnection().getID().equals(connectionID)) {
connection.connectionReadyForWrites(connectionID, ready);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 784f2b6c16..46259d940e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -511,6 +511,24 @@ public interface Configuration {
List<AMQPBrokerConnectConfiguration> getAMQPConnection();
+ /**
+ * Quick set of all AMQP connection configurations in one call which will
clear all
+ * previously set or added broker configurations.
+ *
+ * @param amqpConnectionConfiugrations
+ * A list of AMQP broker connection configurations to assign to the
broker.
+ *
+ * @return this configuration object.
+ */
+ Configuration
setAMQPConnectionConfigurations(List<AMQPBrokerConnectConfiguration>
amqpConnectionConfiugrations);
+
+ /**
+ * Clears the current configuration object of all set or added AMQP
connection configuration elements.
+ *
+ * @return this configuration object.
+ */
+ Configuration clearAMQPConnectionConfigurations();
+
/**
* Returns the queues configured for this server.
*/
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectConfiguration.java
index 9f32d7af90..3bf03e61fd 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectConfiguration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectConfiguration.java
@@ -19,15 +19,19 @@ package
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
+
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import
org.apache.activemq.artemis.core.config.brokerConnectivity.BrokerConnectConfiguration;
import org.apache.activemq.artemis.uri.ConnectorTransportConfigurationParser;
/**
* This is a specific AMQP Broker Connection Configuration
- * */
+ */
public class AMQPBrokerConnectConfiguration extends BrokerConnectConfiguration
{
+ private static final long serialVersionUID = 8827214279279810938L;
+
List<TransportConfiguration> transportConfigurations;
List<AMQPBrokerConnectionElement> connectionElements = new ArrayList<>();
@@ -126,4 +130,33 @@ public class AMQPBrokerConnectConfiguration extends
BrokerConnectConfiguration {
super.setAutostart(autostart);
return this;
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + Objects.hash(connectionElements,
transportConfigurations);
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (!super.equals(obj)) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ final AMQPBrokerConnectConfiguration other =
(AMQPBrokerConnectConfiguration) obj;
+
+ return Objects.equals(connectionElements, other.connectionElements) &&
+ Objects.equals(transportConfigurations,
other.transportConfigurations);
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectionElement.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectionElement.java
index d0cc05d1a2..99b8a1af62 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectionElement.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectionElement.java
@@ -17,12 +17,16 @@
package org.apache.activemq.artemis.core.config.amqpBrokerConnectivity;
import java.io.Serializable;
+import java.util.Objects;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.postoffice.impl.AddressImpl;
public class AMQPBrokerConnectionElement implements Serializable {
+
+ private static final long serialVersionUID = 3653295602796835937L;
+
String name;
SimpleString matchAddress;
SimpleString queueName;
@@ -86,7 +90,6 @@ public class AMQPBrokerConnectionElement implements
Serializable {
return this;
}
-
public String getName() {
return name;
}
@@ -95,4 +98,31 @@ public class AMQPBrokerConnectionElement implements
Serializable {
this.name = name;
}
+ @Override
+ public int hashCode() {
+ // Don't pass the parent into hash or you will get a loop of hash code
computations.
+ return Objects.hash(matchAddress, name, queueName, type);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ final AMQPBrokerConnectionElement other = (AMQPBrokerConnectionElement)
obj;
+
+ return type == other.type &&
+ Objects.equals(name, other.name) &&
+ Objects.equals(matchAddress, other.matchAddress) &&
+ Objects.equals(queueName, other.queueName);
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederatedBrokerConnectionElement.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederatedBrokerConnectionElement.java
index 01c61c55b4..8e5975df8a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederatedBrokerConnectionElement.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederatedBrokerConnectionElement.java
@@ -20,6 +20,7 @@ package
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
/**
@@ -163,4 +164,36 @@ public class AMQPFederatedBrokerConnectionElement extends
AMQPBrokerConnectionEl
public Map<String, Object> getProperties() {
return properties;
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + Objects.hash(localAddressPolicies,
localQueuePolicies, properties, remoteAddressPolicies, remoteQueuePolicies);
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (!super.equals(obj)) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ final AMQPFederatedBrokerConnectionElement other =
(AMQPFederatedBrokerConnectionElement) obj;
+
+ return Objects.equals(localAddressPolicies, other.localAddressPolicies)
&&
+ Objects.equals(localQueuePolicies, other.localQueuePolicies) &&
+ Objects.equals(properties, other.properties) &&
+ Objects.equals(remoteAddressPolicies,
other.remoteAddressPolicies) &&
+ Objects.equals(remoteQueuePolicies, other.remoteQueuePolicies);
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
index 5a6c25785f..944099c2a0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElement.java
@@ -18,11 +18,14 @@ package
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import org.apache.activemq.artemis.api.core.SimpleString;
public class AMQPMirrorBrokerConnectionElement extends
AMQPBrokerConnectionElement {
+ private static final long serialVersionUID = -6171198691682381614L;
+
boolean durable = true;
boolean queueCreation = true;
@@ -150,4 +153,39 @@ public class AMQPMirrorBrokerConnectionElement extends
AMQPBrokerConnectionEleme
public Map<String, Object> getProperties() {
return properties;
}
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result +
+ Objects.hash(addressFilter, durable, messageAcknowledgements,
mirrorSNF, queueCreation, queueRemoval, sync);
+
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (!super.equals(obj)) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ final AMQPMirrorBrokerConnectionElement other =
(AMQPMirrorBrokerConnectionElement) obj;
+
+ return Objects.equals(addressFilter, other.addressFilter) &&
+ durable == other.durable &&
+ messageAcknowledgements == other.messageAcknowledgements &&
+ Objects.equals(mirrorSNF, other.mirrorSNF) &&
+ queueCreation == other.queueCreation &&
+ queueRemoval == other.queueRemoval &&
+ sync == other.sync;
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/brokerConnectivity/BrokerConnectConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/brokerConnectivity/BrokerConnectConfiguration.java
index 61170c353b..c6055e137a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/brokerConnectivity/BrokerConnectConfiguration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/brokerConnectivity/BrokerConnectConfiguration.java
@@ -17,12 +17,17 @@
package org.apache.activemq.artemis.core.config.brokerConnectivity;
import java.io.Serializable;
+import java.util.Objects;
-/** This is an extension point for outgoing broker configuration.
- * This is a new feature that at the time we introduced, is only being used
for AMQP.
- * Where the broker will create a connection towards another broker using a
specific protocol.
- * */
+/**
+ * This is base class for outgoing broker configuration types.
+ *
+ * This is a new feature that at the time we introduced, is only being used
for AMQP.
+ * Where the broker will create a connection towards another broker using a
specific
+ * protocol.
+ */
public abstract class BrokerConnectConfiguration implements Serializable {
+
private static final long serialVersionUID = 8026604526022462048L;
private String name;
@@ -40,8 +45,6 @@ public abstract class BrokerConnectConfiguration implements
Serializable {
public abstract void parseURI() throws Exception;
-
-
public int getReconnectAttempts() {
return reconnectAttempts;
}
@@ -51,7 +54,6 @@ public abstract class BrokerConnectConfiguration implements
Serializable {
return this;
}
-
public String getUser() {
return user;
}
@@ -61,7 +63,6 @@ public abstract class BrokerConnectConfiguration implements
Serializable {
return this;
}
-
public String getPassword() {
return password;
}
@@ -89,7 +90,6 @@ public abstract class BrokerConnectConfiguration implements
Serializable {
return this;
}
-
public String getName() {
return name;
}
@@ -107,4 +107,34 @@ public abstract class BrokerConnectConfiguration
implements Serializable {
this.autostart = autostart;
return this;
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(autostart, name, password, reconnectAttempts,
retryInterval, uri, user);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null) {
+ return false;
+ }
+
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
+ final BrokerConnectConfiguration other = (BrokerConnectConfiguration)
obj;
+
+ return Objects.equals(name, other.name) &&
+ autostart == other.autostart &&
+ Objects.equals(password, other.password) &&
+ reconnectAttempts == other.reconnectAttempts &&
+ retryInterval == other.retryInterval &&
+ Objects.equals(uri, other.uri) &&
+ Objects.equals(user, other.user);
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 79e5f02387..f2c9b6ae6b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -1293,6 +1293,19 @@ public class ConfigurationImpl implements Configuration,
Serializable {
return this.amqpBrokerConnectConfigurations;
}
+ @Override
+ public Configuration
setAMQPConnectionConfigurations(List<AMQPBrokerConnectConfiguration>
amqpConnectionConfiugrations) {
+ this.amqpBrokerConnectConfigurations.clear();
+
this.amqpBrokerConnectConfigurations.addAll(amqpConnectionConfiugrations);
+ return this;
+ }
+
+ @Override
+ public Configuration clearAMQPConnectionConfigurations() {
+ this.amqpBrokerConnectConfigurations.clear();
+ return this;
+ }
+
@Override
public ConfigurationImpl clearClusterConfigurations() {
clusterConfigurations.clear();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
index 478146d733..e2a6ae8339 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java
@@ -130,5 +130,19 @@ public interface RemotingService {
void loadProtocolServices(List<ActiveMQComponent> protocolServices);
+ /**
+ * Provides an entry point for protocol services offered by this service
instance
+ * to react to configuration updates. If the service implementation does
not have any
+ * managed services or its services do not respond to updates it can ignore
this call.
+ * services added should be added to the provided protocolServices list,
and any removed
+ * should be found and removed from the list.
+ *
+ * @param protocolServices
+ * The list of protocol services known to the broker.
+ *
+ * @throws Exception if an error is thrown during the services updates.
+ */
+ void updateProtocolServices(List<ActiveMQComponent> protocolServices)
throws Exception;
+
void addConnectionEntry(Connection connection, ConnectionEntry entry);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 0c2847157b..48a8da0cd6 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -547,6 +547,13 @@ public class RemotingServiceImpl implements
RemotingService, ServerConnectionLif
}
}
+ @Override
+ public void updateProtocolServices(List<ActiveMQComponent>
protocolServices) throws Exception {
+ for (ProtocolManagerFactory protocolManagerFactory :
protocolMap.values()) {
+ protocolManagerFactory.updateProtocolServices(this.server,
protocolServices);
+ }
+ }
+
// ServerConnectionLifeCycleListener implementation
-----------------------------------
private ProtocolManagerFactory getProtocolManager(String protocol) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 05a668c087..22f10da6d6 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -750,6 +750,16 @@ public interface ActiveMQServer extends ServiceComponent {
void registerBrokerConnection(BrokerConnection brokerConnection);
+ /**
+ * Removes the given broker connection from the tracked set of active broker
+ * connection entries. Unregistering the connection results in it being
forgotten
+ * and the caller is responsible for stopping the connection.
+ *
+ * @param brokerConnection
+ * The broker connection that should be forgotten.
+ */
+ void unregisterBrokerConnection(BrokerConnection brokerConnection);
+
void startBrokerConnection(String name) throws Exception;
void stopBrokerConnection(String name) throws Exception;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java
index c525deaa5a..7251eaad23 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java
@@ -16,8 +16,23 @@
*/
package org.apache.activemq.artemis.core.server;
+import
org.apache.activemq.artemis.core.config.brokerConnectivity.BrokerConnectConfiguration;
+
public interface BrokerConnection extends ActiveMQComponent {
+
+ /**
+ * @return the unique name of the broker connection
+ */
String getName();
+ /**
+ * @return the protocol that underlies the broker connection implementation.
+ */
String getProtocol();
+
+ /**
+ * @return the configuration that was used to create this broker connection.
+ */
+ BrokerConnectConfiguration getConfiguration();
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index bc77102532..7b3e9237d1 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1193,11 +1193,15 @@ public class ActiveMQServerImpl implements
ActiveMQServer {
brokerConnectionMap.put(brokerConnection.getName(), brokerConnection);
}
+ @Override
+ public void unregisterBrokerConnection(BrokerConnection brokerConnection) {
+ brokerConnectionMap.remove(brokerConnection.getName());
+ }
+
@Override
public void startBrokerConnection(String name) throws Exception {
BrokerConnection connection = getBrokerConnection(name);
connection.start();
-
}
protected BrokerConnection getBrokerConnection(String name) {
@@ -3604,6 +3608,14 @@ public class ActiveMQServerImpl implements
ActiveMQServer {
}
}
+ private void updateProtocolServices() throws Exception {
+ remotingService.updateProtocolServices(protocolServices);
+
+ for (ProtocolManagerFactory protocolManagerFactory :
protocolManagerFactories) {
+ protocolManagerFactory.updateProtocolServices(this, protocolServices);
+ }
+ }
+
/**
* This method exists for a possibility of test cases replacing the
FileStoreMonitor for an extension that would for instance pretend a disk full
on certain tests.
*/
@@ -4630,6 +4642,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
configuration.setQueueConfigs(config.getQueueConfigs());
configuration.setBridgeConfigurations(config.getBridgeConfigurations());
configuration.setConnectorConfigurations(config.getConnectorConfigurations());
+
configuration.setAMQPConnectionConfigurations(config.getAMQPConnection());
configurationReloadDeployed.set(false);
if (isActive()) {
configuration.parseProperties(propertiesFileUrl);
@@ -4756,12 +4769,13 @@ public class ActiveMQServerImpl implements
ActiveMQServer {
destroyBridge(existingBridgeName);
}
}
-
}
recoverStoredBridges();
recoverStoredConnectors();
+ ActiveMQServerLogger.LOGGER.reloadingConfiguration("protocol
services");
+ updateProtocolServices();
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManagerFactory.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManagerFactory.java
index 3802b86f09..b4f44cf010 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManagerFactory.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractProtocolManagerFactory.java
@@ -53,4 +53,8 @@ public abstract class AbstractProtocolManagerFactory<P
extends BaseInterceptor>
@Override
public void loadProtocolServices(ActiveMQServer server,
List<ActiveMQComponent> services) {
}
+
+ @Override
+ public void updateProtocolServices(ActiveMQServer server,
List<ActiveMQComponent> services) throws Exception {
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
index dcdbf27221..91a587e40f 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManagerFactory.java
@@ -60,4 +60,19 @@ public interface ProtocolManagerFactory<P extends
BaseInterceptor> {
String getModuleName();
void loadProtocolServices(ActiveMQServer server, List<ActiveMQComponent>
services);
+
+ /**
+ * Provides an entry point for the server to trigger the protocol manager
factory to
+ * update its protocol services based on updates to server configuration.
+ *
+ * @param server
+ * The service instance that has triggered this update
+ * @param services
+ * The protocol services that were previously registered (mutable).
+ *
+ * @throws Exception can throw an exception if an error occurs while
updating or adding
+ * protocol services from configuration updates.
+ */
+ void updateProtocolServices(ActiveMQServer server, List<ActiveMQComponent>
services) throws Exception;
+
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectConfigurationTest.java
new file mode 100644
index 0000000000..769bc4a800
--- /dev/null
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectConfigurationTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.config.amqpBrokerConnectivity;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.junit.Test;
+
+/**
+ * Tests for the AMQPBrokerConnectConfiguration type
+ */
+public class AMQPBrokerConnectConfigurationTest {
+
+ @Test
+ public void testEquals() {
+ AMQPBrokerConnectConfiguration config1 = new
AMQPBrokerConnectConfiguration();
+ AMQPBrokerConnectConfiguration config2 = new
AMQPBrokerConnectConfiguration();
+
+ assertEquals(config1, config2);
+
+ // Name
+ config1.setName("test");
+ assertNotEquals(config1, config2);
+ config2.setName("test");
+ assertEquals(config1, config2);
+
+ // User
+ config1.setUser("test");
+ assertNotEquals(config1, config2);
+ config2.setUser("test");
+ assertEquals(config1, config2);
+
+ // Password
+ config1.setPassword("test");
+ assertNotEquals(config1, config2);
+ config2.setPassword("test");
+ assertEquals(config1, config2);
+
+ // Uri
+ config1.setUri("test");
+ assertNotEquals(config1, config2);
+ config2.setUri("test");
+ assertEquals(config1, config2);
+
+ // Reconnect Attempts
+ config1.setReconnectAttempts(1);
+ assertNotEquals(config1, config2);
+ config2.setReconnectAttempts(1);
+ assertEquals(config1, config2);
+
+ // Retry Interval
+ config1.setRetryInterval(1);
+ assertNotEquals(config1, config2);
+ config2.setRetryInterval(1);
+ assertEquals(config1, config2);
+
+ // Auto start
+ config1.setAutostart(false);
+ assertNotEquals(config1, config2);
+ config2.setAutostart(false);
+ assertEquals(config1, config2);
+
+ // Broker connection elements
+ AMQPBrokerConnectionElement element = new AMQPBrokerConnectionElement();
+
+ element.setName("test");
+
+ config1.addElement(element);
+ assertNotEquals(config1, config2);
+ config2.addElement(element);
+ assertEquals(config1, config2);
+ }
+
+ @Test
+ public void testHashCode() {
+ AMQPBrokerConnectConfiguration config1 = new
AMQPBrokerConnectConfiguration();
+ AMQPBrokerConnectConfiguration config2 = new
AMQPBrokerConnectConfiguration();
+
+ assertEquals(config1, config2);
+
+ // Name
+ config1.setName("test");
+ assertNotEquals(config1.hashCode(), config2.hashCode());
+ config2.setName("test");
+ assertEquals(config1.hashCode(), config2.hashCode());
+
+ // User
+ config1.setUser("test");
+ assertNotEquals(config1.hashCode(), config2.hashCode());
+ config2.setUser("test");
+ assertEquals(config1.hashCode(), config2.hashCode());
+
+ // Password
+ config1.setPassword("test");
+ assertNotEquals(config1.hashCode(), config2.hashCode());
+ config2.setPassword("test");
+ assertEquals(config1.hashCode(), config2.hashCode());
+
+ // Uri
+ config1.setUri("test");
+ assertNotEquals(config1.hashCode(), config2.hashCode());
+ config2.setUri("test");
+ assertEquals(config1.hashCode(), config2.hashCode());
+
+ // Reconnect Attempts
+ config1.setReconnectAttempts(1);
+ assertNotEquals(config1.hashCode(), config2.hashCode());
+ config2.setReconnectAttempts(1);
+ assertEquals(config1.hashCode(), config2.hashCode());
+
+ // Retry Interval
+ config1.setRetryInterval(1);
+ assertNotEquals(config1.hashCode(), config2.hashCode());
+ config2.setRetryInterval(1);
+ assertEquals(config1.hashCode(), config2.hashCode());
+
+ // Auto start
+ config1.setAutostart(false);
+ assertNotEquals(config1.hashCode(), config2.hashCode());
+ config2.setAutostart(false);
+ assertEquals(config1.hashCode(), config2.hashCode());
+
+ // Broker connection elements
+ AMQPBrokerConnectionElement element = new AMQPBrokerConnectionElement();
+
+ element.setName("test");
+
+ config1.addElement(element);
+ assertNotEquals(config1.hashCode(), config2.hashCode());
+ config2.addElement(element);
+ assertEquals(config1.hashCode(), config2.hashCode());
+ }
+}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectionElementTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectionElementTest.java
new file mode 100644
index 0000000000..36848f46a1
--- /dev/null
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPBrokerConnectionElementTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.config.amqpBrokerConnectivity;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.junit.Test;
+
+/**
+ * Test basic API functionality of the AMQPBrokerConnectionElement type
+ */
+public class AMQPBrokerConnectionElementTest {
+
+ @Test
+ public void testEquals() {
+ AMQPBrokerConnectionElement element1 = new AMQPBrokerConnectionElement();
+ AMQPBrokerConnectionElement element2 = new AMQPBrokerConnectionElement();
+
+ assertEquals(element1, element2);
+
+ // Name
+ element1.setName("test");
+ assertNotEquals(element1, element2);
+ element2.setName("test");
+ assertEquals(element1, element2);
+
+ // Match Address
+ element1.setMatchAddress("test");
+ assertNotEquals(element1, element2);
+ element2.setMatchAddress("test");
+ assertEquals(element1, element2);
+
+ // Queue Name
+ element1.setQueueName("test");
+ assertNotEquals(element1, element2);
+ element2.setQueueName("test");
+ assertEquals(element1, element2);
+
+ // Type
+ element1.setType(AMQPBrokerConnectionAddressType.MIRROR);
+ assertNotEquals(element1, element2);
+ element2.setType(AMQPBrokerConnectionAddressType.MIRROR);
+ assertEquals(element1, element2);
+ }
+
+ @Test
+ public void testHashCode() {
+ AMQPBrokerConnectionElement element1 = new AMQPBrokerConnectionElement();
+ AMQPBrokerConnectionElement element2 = new AMQPBrokerConnectionElement();
+
+ assertEquals(element1.hashCode(), element2.hashCode());
+
+ // Name
+ element1.setName("test");
+ assertNotEquals(element1.hashCode(), element2.hashCode());
+ element2.setName("test");
+ assertEquals(element1.hashCode(), element2.hashCode());
+
+ // Match Address
+ element1.setMatchAddress("test");
+ assertNotEquals(element1.hashCode(), element2.hashCode());
+ element2.setMatchAddress("test");
+ assertEquals(element1.hashCode(), element2.hashCode());
+
+ // Queue Name
+ element1.setQueueName("test");
+ assertNotEquals(element1.hashCode(), element2.hashCode());
+ element2.setQueueName("test");
+ assertEquals(element1.hashCode(), element2.hashCode());
+
+ // Type
+ element1.setType(AMQPBrokerConnectionAddressType.MIRROR);
+ assertNotEquals(element1.hashCode(), element2.hashCode());
+ element2.setType(AMQPBrokerConnectionAddressType.MIRROR);
+ assertEquals(element1.hashCode(), element2.hashCode());
+
+ // Parent is not considered when checking equals or configurations would
+ // be unequal when loaded or reloaded.
+ final AMQPBrokerConnectConfiguration parent = new
AMQPBrokerConnectConfiguration();
+
+ element1.setParent(parent);
+ assertEquals(element1.hashCode(), element2.hashCode());
+ assertEquals(element1, element2);
+ element2.setParent(parent);
+ assertEquals(element1.hashCode(), element2.hashCode());
+ assertEquals(element1, element2);
+ }
+}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederatedBrokerConnectionElementTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederatedBrokerConnectionElementTest.java
new file mode 100644
index 0000000000..9ea06cdc7c
--- /dev/null
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPFederatedBrokerConnectionElementTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.config.amqpBrokerConnectivity;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.junit.Test;
+
+/**
+ * Test for the API of AMQPFederatedBrokerConnectionElement
+ */
+public class AMQPFederatedBrokerConnectionElementTest {
+
+ @Test
+ public void testEquals() {
+ AMQPFederatedBrokerConnectionElement element1 = new
AMQPFederatedBrokerConnectionElement();
+ AMQPFederatedBrokerConnectionElement element2 = new
AMQPFederatedBrokerConnectionElement();
+
+ // Properties
+ element1.addProperty("test", "test");
+ assertNotEquals(element1, element2);
+ element2.addProperty("test", "test");
+ assertEquals(element1, element2);
+
+ AMQPFederationAddressPolicyElement addressPolicy = new
AMQPFederationAddressPolicyElement();
+ addressPolicy.addToIncludes("test");
+
+ // Local Address policy
+ element1.addLocalAddressPolicy(addressPolicy);
+ assertNotEquals(element1, element2);
+ element2.addLocalAddressPolicy(addressPolicy);
+ assertEquals(element1, element2);
+
+ // Remote Address policy
+ element1.addRemoteAddressPolicy(addressPolicy);
+ assertNotEquals(element1, element2);
+ element2.addRemoteAddressPolicy(addressPolicy);
+ assertEquals(element1, element2);
+
+ AMQPFederationQueuePolicyElement queuePolicy = new
AMQPFederationQueuePolicyElement();
+ queuePolicy.addToExcludes("test", "test");
+
+ // Local Queue policy
+ element1.addLocalQueuePolicy(queuePolicy);
+ assertNotEquals(element1, element2);
+ element2.addLocalQueuePolicy(queuePolicy);
+ assertEquals(element1, element2);
+
+ // Remote Queue policy
+ element1.addRemoteQueuePolicy(queuePolicy);
+ assertNotEquals(element1, element2);
+ element2.addRemoteQueuePolicy(queuePolicy);
+ assertEquals(element1, element2);
+ }
+
+ @Test
+ public void testHashCode() {
+ AMQPFederatedBrokerConnectionElement element1 = new
AMQPFederatedBrokerConnectionElement();
+ AMQPFederatedBrokerConnectionElement element2 = new
AMQPFederatedBrokerConnectionElement();
+
+ // Properties
+ element1.addProperty("test", "value");
+ assertNotEquals(element1.hashCode(), element2.hashCode());
+ element2.addProperty("test", "value");
+ assertEquals(element1.hashCode(), element2.hashCode());
+
+ AMQPFederationAddressPolicyElement addressPolicy = new
AMQPFederationAddressPolicyElement();
+ addressPolicy.addToIncludes("test");
+
+ // Local Address policy
+ element1.addLocalAddressPolicy(addressPolicy);
+ assertNotEquals(element1.hashCode(), element2.hashCode());
+ element2.addLocalAddressPolicy(addressPolicy);
+ assertEquals(element1.hashCode(), element2.hashCode());
+
+ // Remote Address policy
+ element1.addRemoteAddressPolicy(addressPolicy);
+ assertNotEquals(element1.hashCode(), element2.hashCode());
+ element2.addRemoteAddressPolicy(addressPolicy);
+ assertEquals(element1.hashCode(), element2.hashCode());
+
+ AMQPFederationQueuePolicyElement queuePolicy = new
AMQPFederationQueuePolicyElement();
+ queuePolicy.addToExcludes("test", "value");
+
+ // Local Queue policy
+ element1.addLocalQueuePolicy(queuePolicy);
+ assertNotEquals(element1.hashCode(), element2.hashCode());
+ element2.addLocalQueuePolicy(queuePolicy);
+ assertEquals(element1.hashCode(), element2.hashCode());
+
+ // Remote Queue policy
+ element1.addRemoteQueuePolicy(queuePolicy);
+ assertNotEquals(element1.hashCode(), element2.hashCode());
+ element2.addRemoteQueuePolicy(queuePolicy);
+ assertEquals(element1.hashCode(), element2.hashCode());
+ }
+}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElementTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElementTest.java
new file mode 100644
index 0000000000..e4f99d49ee
--- /dev/null
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/amqpBrokerConnectivity/AMQPMirrorBrokerConnectionElementTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.config.amqpBrokerConnectivity;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.junit.Test;
+
+/**
+ * Test for the AMQPMirrorBrokerConnectionElement basic API
+ */
+public class AMQPMirrorBrokerConnectionElementTest {
+
+ @Test
+ public void testEquals() {
+ AMQPMirrorBrokerConnectionElement mirror1 = new
AMQPMirrorBrokerConnectionElement();
+ AMQPMirrorBrokerConnectionElement mirror2 = new
AMQPMirrorBrokerConnectionElement();
+
+ // Durable
+ mirror1.setDurable(!mirror1.isDurable());
+ assertNotEquals(mirror1, mirror2);
+ mirror2.setDurable(mirror1.isDurable());
+ assertEquals(mirror1, mirror2);
+
+ // Queue Create
+ mirror1.setQueueCreation(!mirror1.isQueueCreation());
+ assertNotEquals(mirror1, mirror2);
+ mirror2.setQueueCreation(mirror1.isQueueCreation());
+ assertEquals(mirror1, mirror2);
+
+ // Queue Remove
+ mirror1.setQueueRemoval(!mirror1.isQueueRemoval());
+ assertNotEquals(mirror1, mirror2);
+ mirror2.setQueueRemoval(mirror1.isQueueRemoval());
+ assertEquals(mirror1, mirror2);
+
+ // Message Acknowledgement
+ mirror1.setMessageAcknowledgements(!mirror1.isMessageAcknowledgements());
+ assertNotEquals(mirror1, mirror2);
+ mirror2.setMessageAcknowledgements(mirror1.isMessageAcknowledgements());
+ assertEquals(mirror1, mirror2);
+
+ // Sync
+ mirror1.setSync(!mirror1.isSync());
+ assertNotEquals(mirror1, mirror2);
+ mirror2.setSync(mirror1.isSync());
+ assertEquals(mirror1, mirror2);
+
+ // Mirror SNF
+ mirror1.setMirrorSNF(SimpleString.toSimpleString("test"));
+ assertNotEquals(mirror1, mirror2);
+ mirror2.setMirrorSNF(SimpleString.toSimpleString("test"));
+ assertEquals(mirror1, mirror2);
+
+ // Address Filter
+ mirror1.setAddressFilter("test");
+ assertNotEquals(mirror1, mirror2);
+ mirror2.setAddressFilter("test");
+ assertEquals(mirror1, mirror2);
+ }
+
+ @Test
+ public void testHashCode() {
+ AMQPMirrorBrokerConnectionElement mirror1 = new
AMQPMirrorBrokerConnectionElement();
+ AMQPMirrorBrokerConnectionElement mirror2 = new
AMQPMirrorBrokerConnectionElement();
+
+ // Durable
+ mirror1.setDurable(!mirror1.isDurable());
+ assertNotEquals(mirror1.hashCode(), mirror2.hashCode());
+ mirror2.setDurable(mirror1.isDurable());
+ assertEquals(mirror1.hashCode(), mirror2.hashCode());
+
+ // Queue Create
+ mirror1.setQueueCreation(!mirror1.isQueueCreation());
+ assertNotEquals(mirror1.hashCode(), mirror2.hashCode());
+ mirror2.setQueueCreation(mirror1.isQueueCreation());
+ assertEquals(mirror1.hashCode(), mirror2.hashCode());
+
+ // Queue Remove
+ mirror1.setQueueRemoval(!mirror1.isQueueRemoval());
+ assertNotEquals(mirror1.hashCode(), mirror2.hashCode());
+ mirror2.setQueueRemoval(mirror1.isQueueRemoval());
+ assertEquals(mirror1.hashCode(), mirror2.hashCode());
+
+ // Message Acknowledgement
+ mirror1.setMessageAcknowledgements(!mirror1.isMessageAcknowledgements());
+ assertNotEquals(mirror1.hashCode(), mirror2.hashCode());
+ mirror2.setMessageAcknowledgements(mirror1.isMessageAcknowledgements());
+ assertEquals(mirror1.hashCode(), mirror2.hashCode());
+
+ // Sync
+ mirror1.setSync(!mirror1.isSync());
+ assertNotEquals(mirror1.hashCode(), mirror2.hashCode());
+ mirror2.setSync(mirror1.isSync());
+ assertEquals(mirror1.hashCode(), mirror2.hashCode());
+
+ // Mirror SNF
+ mirror1.setMirrorSNF(SimpleString.toSimpleString("test"));
+ assertNotEquals(mirror1.hashCode(), mirror2.hashCode());
+ mirror2.setMirrorSNF(SimpleString.toSimpleString("test"));
+ assertEquals(mirror1.hashCode(), mirror2.hashCode());
+
+ // Address Filter
+ mirror1.setAddressFilter("test");
+ assertNotEquals(mirror1.hashCode(), mirror2.hashCode());
+ mirror2.setAddressFilter("test");
+ assertEquals(mirror1.hashCode(), mirror2.hashCode());
+ }
+}
diff --git a/pom.xml b/pom.xml
index 9ef58ded2c..d97519f7c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,7 +132,7 @@
<!-- this is basically for tests -->
<netty-tcnative-version>2.0.61.Final</netty-tcnative-version>
<proton.version>0.34.1</proton.version>
- <protonj2.version>1.0.0-M17</protonj2.version>
+ <protonj2.version>1.0.0-M19</protonj2.version>
<slf4j.version>2.0.9</slf4j.version>
<log4j.version>2.21.1</log4j.version>
<qpid.jms.version>1.10.0</qpid.jms.version>
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConfigurationReloadTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConfigurationReloadTest.java
new file mode 100644
index 0000000000..a9c03f53fe
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConfigurationReloadTest.java
@@ -0,0 +1,840 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_DELAY;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.ADDRESS_AUTO_DELETE_MSG_COUNT;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_RECEIVER_PRIORITY;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport.DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT;
+import static
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport.FEDERATED_ADDRESS_SOURCE_PROPERTIES;
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import java.lang.invoke.MethodHandles;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardCopyOption;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import
org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.integration.jms.RedeployTest;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for reload handling in the broker connection federation implementation
+ */
+public class AMQPFederationConfigurationReloadTest extends
AmqpClientTestSupport {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,CORE";
+ }
+
+ @Override
+ protected ActiveMQServer createServer() throws Exception {
+ // Creates the broker used to make the outgoing connection. The port
passed is for
+ // that brokers acceptor. The test server connected to by the broker
binds to a random port.
+ return createServer(AMQP_PORT, false);
+ }
+
+ @Test(timeout = 20000)
+ public void testFederationConfigurationWithoutChangesIsIgnoredOnUpdate()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .respond()
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes("test");
+ receiveFromAddress.setAutoDelete(true);
+ receiveFromAddress.setAutoDeleteDelay(10_000L);
+ receiveFromAddress.setAutoDeleteMessageCount(-1L);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName("sample-federation");
+ element.addLocalAddressPolicy(receiveFromAddress);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration("test-address-federation",
"tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+ server.addAddressInfo(new
AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
+
+ final Map<String, Object> expectedSourceProperties = new HashMap<>();
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 10_000L);
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withName(allOf(containsString("sample-federation"),
+ containsString("test"),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
expectedSourceProperties)
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer =
session.createConsumer(session.createTopic("test"));
+
+ final ProtonProtocolManagerFactory protocolFactory =
(ProtonProtocolManagerFactory)
+ server.getRemotingService().getProtocolFactoryMap().get("AMQP");
+ assertNotNull(protocolFactory);
+
+ final AMQPFederationAddressPolicyElement updatedReceiveFromAddress
= new AMQPFederationAddressPolicyElement();
+ updatedReceiveFromAddress.setName("address-policy");
+ updatedReceiveFromAddress.addToIncludes("test");
+ updatedReceiveFromAddress.setAutoDelete(true);
+ updatedReceiveFromAddress.setAutoDeleteDelay(10_000L);
+ updatedReceiveFromAddress.setAutoDeleteMessageCount(-1L);
+
+ final AMQPFederatedBrokerConnectionElement updatedElement = new
AMQPFederatedBrokerConnectionElement();
+ updatedElement.setName("sample-federation");
+ updatedElement.addLocalAddressPolicy(updatedReceiveFromAddress);
+
+ amqpConnection.getConnectionElements().clear();
+ amqpConnection.addElement(updatedElement); // This should be
equivalent to replacing the previous instance.
+
+ server.getConfiguration().getAMQPConnection().clear();
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+
+ protocolFactory.updateProtocolServices(server,
Collections.emptyList());
+
+ connection.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDetach().respond();
+
+ consumer.close();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void
testFederationConnectsToSecondPeerWhenConfigurationUpdatedWithNewConnection()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .respond()
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes("test");
+ receiveFromAddress.setAutoDelete(true);
+ receiveFromAddress.setAutoDeleteDelay(10_000L);
+ receiveFromAddress.setAutoDeleteMessageCount(-1L);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName("sample-federation");
+ element.addLocalAddressPolicy(receiveFromAddress);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration("test-address-federation",
"tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+ server.addAddressInfo(new
AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
+
+ final Map<String, Object> expectedSourceProperties = new HashMap<>();
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 10_000L);
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withName(allOf(containsString("sample-federation"),
+ containsString("test"),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
expectedSourceProperties)
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ session.createConsumer(session.createTopic("test"));
+
+ connection.start();
+
+ try (ProtonTestServer peer2 = new ProtonTestServer()) {
+ peer2.expectSASLAnonymousConnect();
+ peer2.expectOpen().respond();
+ peer2.expectBegin().respond();
+ peer2.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .respond()
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer2.start();
+
+ final URI remoteURI2 = peer2.getServerURI();
+ logger.info("Test peer 2 started, peer listening on: {}",
remoteURI2);
+
+ final ProtonProtocolManagerFactory protocolFactory =
(ProtonProtocolManagerFactory)
+
server.getRemotingService().getProtocolFactoryMap().get("AMQP");
+ assertNotNull(protocolFactory);
+
+ final AMQPFederationAddressPolicyElement
updatedReceiveFromAddress = new AMQPFederationAddressPolicyElement();
+ updatedReceiveFromAddress.setName("address-policy");
+ updatedReceiveFromAddress.addToIncludes("test");
+ updatedReceiveFromAddress.setAutoDelete(true);
+ updatedReceiveFromAddress.setAutoDeleteDelay(10_000L);
+ updatedReceiveFromAddress.setAutoDeleteMessageCount(-1L);
+
+ final AMQPFederatedBrokerConnectionElement updatedElement = new
AMQPFederatedBrokerConnectionElement();
+ updatedElement.setName("sample-federation-2");
+ updatedElement.addLocalAddressPolicy(updatedReceiveFromAddress);
+
+ final AMQPBrokerConnectConfiguration updatedAmqpConnection =
+ new
AMQPBrokerConnectConfiguration("test-address-federation-2", "tcp://" +
remoteURI2.getHost() + ":" + remoteURI2.getPort());
+ updatedAmqpConnection.setReconnectAttempts(0);// No reconnects
+ updatedAmqpConnection.addElement(updatedElement);
+
+
server.getConfiguration().addAMQPConnection(updatedAmqpConnection);
+
+ protocolFactory.updateProtocolServices(server,
Collections.emptyList());
+
+ peer2.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer2.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withName(allOf(containsString("sample-federation"),
+ containsString("test"),
+
containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
expectedSourceProperties)
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer2.expectFlow().withLinkCredit(1000);
+
+ peer2.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer2.close();
+ }
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void
testFederationDisconnectsFromExistingPeerIfConfigurationRemoved() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .respond()
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes("test");
+ receiveFromAddress.setAutoDelete(true);
+ receiveFromAddress.setAutoDeleteDelay(10_000L);
+ receiveFromAddress.setAutoDeleteMessageCount(-1L);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName("sample-federation");
+ element.addLocalAddressPolicy(receiveFromAddress);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration("test-address-federation",
"tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+ server.addAddressInfo(new
AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
+
+ final Map<String, Object> expectedSourceProperties = new HashMap<>();
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 10_000L);
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withName(allOf(containsString("sample-federation"),
+ containsString("test"),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
expectedSourceProperties)
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ session.createConsumer(session.createTopic("test"));
+
+ connection.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDetach().optional();
+ peer.expectClose().optional();
+ peer.expectConnectionToDrop();
+
+ final ProtonProtocolManagerFactory protocolFactory =
(ProtonProtocolManagerFactory)
+ server.getRemotingService().getProtocolFactoryMap().get("AMQP");
+ assertNotNull(protocolFactory);
+
+ server.getConfiguration().clearAMQPConnectionConfigurations();
+
+ protocolFactory.updateProtocolServices(server,
Collections.emptyList());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ // Create more demand, no federation should be initiated
+ session.createConsumer(session.createTopic("test"));
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testFederationUpdatesPolicyAndFederatesQueueInsteadOfAddress()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .respond()
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationAddressPolicyElement receiveFromAddress = new
AMQPFederationAddressPolicyElement();
+ receiveFromAddress.setName("address-policy");
+ receiveFromAddress.addToIncludes("test");
+ receiveFromAddress.setAutoDelete(true);
+ receiveFromAddress.setAutoDeleteDelay(10_000L);
+ receiveFromAddress.setAutoDeleteMessageCount(-1L);
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName("sample-federation");
+ element.addLocalAddressPolicy(receiveFromAddress);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration("test-federation", "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+ server.addAddressInfo(new
AddressInfo(SimpleString.toSimpleString("test"), RoutingType.MULTICAST));
+
+ final Map<String, Object> expectedSourceProperties = new HashMap<>();
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE, true);
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE_DELAY, 10_000L);
+ expectedSourceProperties.put(ADDRESS_AUTO_DELETE_MSG_COUNT, -1L);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
+
.withName(allOf(containsString("sample-federation"),
+ containsString("test"),
+ containsString("address-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withProperty(FEDERATED_ADDRESS_SOURCE_PROPERTIES.toString(),
expectedSourceProperties)
+ .respond()
+
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ session.createConsumer(session.createTopic("test"));
+ session.createConsumer(session.createQueue("queue"));
+
+ connection.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDetach().optional();
+ peer.expectClose().optional();
+ peer.expectConnectionToDrop();
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .respond()
+
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
+
.withName(allOf(containsString("sample-federation"),
+ containsString("queue::queue"),
+
containsString("queue-receiver"),
+
containsString(server.getNodeID().toString())))
+
.withProperty(FEDERATION_RECEIVER_PRIORITY.toString(),
DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT)
+ .respond()
+
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString());
+ peer.expectFlow().withLinkCredit(1000);
+
+ final ProtonProtocolManagerFactory protocolFactory =
(ProtonProtocolManagerFactory)
+ server.getRemotingService().getProtocolFactoryMap().get("AMQP");
+ assertNotNull(protocolFactory);
+
+ final AMQPFederationAddressPolicyElement updatedReceiveFromAddress
= new AMQPFederationAddressPolicyElement();
+ updatedReceiveFromAddress.setName("address-policy");
+ updatedReceiveFromAddress.addToIncludes("test");
+ updatedReceiveFromAddress.setAutoDelete(true);
+ updatedReceiveFromAddress.setAutoDeleteDelay(10_000L);
+ updatedReceiveFromAddress.setAutoDeleteMessageCount(-1L);
+
+ final AMQPFederationQueuePolicyElement updatedReceiveFromQueue =
new AMQPFederationQueuePolicyElement();
+ updatedReceiveFromQueue.setName("queue-policy");
+ updatedReceiveFromQueue.addToIncludes("*", "queue");
+
+ final AMQPFederatedBrokerConnectionElement updatedElement = new
AMQPFederatedBrokerConnectionElement();
+ updatedElement.setName("sample-federation");
+ updatedElement.addLocalQueuePolicy(updatedReceiveFromQueue);
+
+ final AMQPBrokerConnectConfiguration updatedAmqpConnection =
+ new AMQPBrokerConnectConfiguration("test-federation", "tcp://"
+ remoteURI.getHost() + ":" + remoteURI.getPort());
+ updatedAmqpConnection.setReconnectAttempts(0);// No reconnects
+ updatedAmqpConnection.addElement(updatedElement);
+
+ server.getConfiguration().getAMQPConnection().clear();
+ server.getConfiguration().addAMQPConnection(updatedAmqpConnection);
+
+ protocolFactory.updateProtocolServices(server,
Collections.emptyList());
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.close();
+ }
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testReloadAmqpConnectionAddressPolicyMatches() throws Exception
{
+ server.start();
+
+ final Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ final URL url1 =
RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-addresses.xml");
+ final URL url2 =
RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-addresses-reload.xml");
+
+ Files.copy(url1.openStream(), brokerXML);
+
+ final EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+ final Runnable tick = latch::countDown;
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:61617");
+ final ConnectionFactory serverCF =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection();
+ Connection serverConnection = serverCF.createConnection()) {
+
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic address1 = session.createTopic("address1");
+ final Topic address2 = session.createTopic("address2");
+ final MessageConsumer address1Consumer =
session.createConsumer(address1);
+ final MessageConsumer address2Consumer =
session.createConsumer(address2);
+
+ connection.start();
+
+ // Produces on the "remote" server which should federate to the
embedded "local" instance
+ final Session serverSession =
serverConnection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer address1Producer =
serverSession.createProducer(address1);
+ final MessageProducer address2Producer =
serverSession.createProducer(address2);
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ // Demand on local address should trigger receiver on remote.
+ Wait.assertTrue(() ->
server.addressQuery(SimpleString.toSimpleString("address1")).isExists());
+
+ final TextMessage message = session.createTextMessage("test");
+
+ address1Producer.send(message);
+ address2Producer.send(message);
+
+ assertNotNull(address1Consumer.receive(5_000));
+ assertNull(address2Consumer.receiveNoWait());
+
+ Files.copy(url2.openStream(), brokerXML,
StandardCopyOption.REPLACE_EXISTING);
+ brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+ // Demand on local address should trigger receiver on remote.
+ Wait.assertTrue(() ->
server.addressQuery(SimpleString.toSimpleString("address2")).isExists());
+
+ // Should arrive on the original federated Address and the now added
address
+ // but we need to await the federation being setup and a binding
added.
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.toSimpleString("address2")).getQueueNames().size()
> 0);
+
+ address1Producer.send(message);
+ address2Producer.send(message);
+
+ assertNotNull(address1Consumer.receive(5_000));
+ assertNotNull(address2Consumer.receive(5_000));
+
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testReloadAmqpConnectionQueuePolicyMatches() throws Exception {
+ server.start();
+ server.createQueue(new
QueueConfiguration("queue1").setRoutingType(RoutingType.ANYCAST)
+ .setAddress("queue1")
+
.setAutoCreated(false));
+ server.createQueue(new
QueueConfiguration("queue2").setRoutingType(RoutingType.ANYCAST)
+ .setAddress("queue2")
+
.setAutoCreated(false));
+
+ final Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ final URL url1 =
RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-queues.xml");
+ final URL url2 =
RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-queues-reload.xml");
+
+ Files.copy(url1.openStream(), brokerXML);
+
+ final EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+ final Runnable tick = latch::countDown;
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:61617");
+ final ConnectionFactory serverCF =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ try (Connection connection = factory.createConnection();
+ Connection serverConnection = serverCF.createConnection()) {
+
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Queue queue1 = session.createQueue("queue1");
+ final Queue queue2 = session.createQueue("queue2");
+ final MessageConsumer queue1Consumer = session.createConsumer(queue1);
+ final MessageConsumer queue2Consumer = session.createConsumer(queue2);
+
+ connection.start();
+
+ // Produces on the "remote" server which should federate to the
embedded "local" instance
+ final Session serverSession =
serverConnection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer queue1Producer =
serverSession.createProducer(queue1);
+ final MessageProducer queue2Producer =
serverSession.createProducer(queue2);
+
+ // Demand on local queue should trigger receiver on remote.
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.toSimpleString("queue1")).isExists());
+
+ final TextMessage message = session.createTextMessage("test");
+
+ queue1Producer.send(message);
+ queue2Producer.send(message);
+
+ // Should arrive on the original federated Queue but not the updated
Queue as it is not
+ // currently federated.
+
+ Wait.assertTrue(() ->
+
embeddedActiveMQ.getActiveMQServer().queueQuery(SimpleString.toSimpleString("queue1")).getMessageCount()
== 1);
+
+ assertNotNull(queue1Consumer.receiveNoWait());
+ assertNull(queue2Consumer.receiveNoWait());
+
+ Files.copy(url2.openStream(), brokerXML,
StandardCopyOption.REPLACE_EXISTING);
+ brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+ // Demand on local queue should trigger receiver on remote.
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.toSimpleString("queue2")).isExists());
+
+ // Should arrive on the original federated Queue and the updated
Queue as it is now federated
+
+ queue1Producer.send(message);
+
+ Wait.assertTrue(() ->
+
embeddedActiveMQ.getActiveMQServer().queueQuery(SimpleString.toSimpleString("queue1")).getMessageCount()
== 1);
+ Wait.assertTrue(() ->
+
embeddedActiveMQ.getActiveMQServer().queueQuery(SimpleString.toSimpleString("queue2")).getMessageCount()
== 1);
+
+ assertNotNull(queue1Consumer.receiveNoWait());
+ assertNotNull(queue2Consumer.receiveNoWait());
+
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testReloadAmqpConnectionAddressPolicyReplacedWithQueuePolicy()
throws Exception {
+ server.start();
+ server.createQueue(new
QueueConfiguration("queue1").setRoutingType(RoutingType.ANYCAST)
+ .setAddress("queue1")
+
.setAutoCreated(false));
+
+ final Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ final URL url1 =
RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-addresses.xml");
+ final URL url2 =
RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-queues.xml");
+
+ Files.copy(url1.openStream(), brokerXML);
+
+ final EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+ final Runnable tick = latch::countDown;
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:61617");
+ final ConnectionFactory serverCF =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection();
+ Connection serverConnection = serverCF.createConnection()) {
+
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Topic address = session.createTopic("address1");
+ final Queue queue = session.createQueue("queue1");
+ final MessageConsumer addressConsumer =
session.createConsumer(address);
+ final MessageConsumer queueConsumer = session.createConsumer(queue);
+
+ connection.start();
+
+ // Produces on the "remote" server which should federate to the
embedded "local" instance
+ final Session serverSession =
serverConnection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer addressProducer =
serverSession.createProducer(address);
+ final MessageProducer queueProducer =
serverSession.createProducer(queue);
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ // Demand on local address should trigger receiver on remote.
+ Wait.assertTrue(() ->
server.addressQuery(SimpleString.toSimpleString("address1")).isExists());
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.toSimpleString("queue1")).isExists());
+
+ final TextMessage message = session.createTextMessage("test");
+
+ addressProducer.send(message);
+ queueProducer.send(message);
+
+ assertNotNull(addressConsumer.receive(5_000));
+ assertNull(queueConsumer.receiveNoWait());
+
+ Files.copy(url2.openStream(), brokerXML,
StandardCopyOption.REPLACE_EXISTING);
+ brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+ // The previously sent message should be federated to the embedded
broker
+ assertNotNull(queueConsumer.receive(5_000));
+
+ // The original address consumer should have gone away and not
returned when the federation
+ // connection was recreated.
+ Wait.assertTrue(() ->
server.bindingQuery(SimpleString.toSimpleString("address2")).getQueueNames().size()
== 0);
+
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void
testReloadAmqpConnectionQueuePolicyMatchesFromBrokerProperties() throws
Exception {
+ server.start();
+ server.createQueue(new
QueueConfiguration("queue1").setRoutingType(RoutingType.ANYCAST)
+ .setAddress("queue1")
+
.setAutoCreated(false));
+ server.createQueue(new
QueueConfiguration("queue2").setRoutingType(RoutingType.ANYCAST)
+ .setAddress("queue2")
+
.setAutoCreated(false));
+
+ final Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ final Path brokerProperties =
getTestDirfile().toPath().resolve("broker.properties");
+
+ final URL url1 =
RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-basic.xml");
+
+ final URL propertiesUrl1 =
RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-queues.properties");
+ final URL propertiesUrl2 =
RedeployTest.class.getClassLoader().getResource("reload-amqp-federated-queues-reload.properties");
+
+ Files.copy(url1.openStream(), brokerXML);
+ Files.copy(propertiesUrl1.openStream(), brokerProperties);
+
+ final EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.setPropertiesResourcePath(brokerProperties.toString());
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+ final Runnable tick = latch::countDown;
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:61617");
+ final ConnectionFactory serverCF =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ latch.await(10, TimeUnit.SECONDS);
+
+ try (Connection connection = factory.createConnection();
+ Connection serverConnection = serverCF.createConnection()) {
+
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final Queue queue1 = session.createQueue("queue1");
+ final Queue queue2 = session.createQueue("queue2");
+ final MessageConsumer queue1Consumer = session.createConsumer(queue1);
+ final MessageConsumer queue2Consumer = session.createConsumer(queue2);
+
+ connection.start();
+
+ // Produces on the "remote" server which should federate to the
embedded "local" instance
+ final Session serverSession =
serverConnection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer queue1Producer =
serverSession.createProducer(queue1);
+ final MessageProducer queue2Producer =
serverSession.createProducer(queue2);
+
+ // Demand on local queue should trigger receiver on remote.
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.toSimpleString("queue1")).isExists());
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.toSimpleString("queue2")).isExists());
+
+ final TextMessage message = session.createTextMessage("test");
+
+ queue1Producer.send(message);
+ queue2Producer.send(message);
+
+ // Should get message sent to the single federated queue
+ Wait.assertTrue(() ->
+
embeddedActiveMQ.getActiveMQServer().queueQuery(SimpleString.toSimpleString("queue1")).getMessageCount()
== 1);
+
+ assertNotNull(queue1Consumer.receiveNoWait());
+ assertNull(queue2Consumer.receiveNoWait());
+
+ Files.copy(propertiesUrl2.openStream(), brokerProperties,
StandardCopyOption.REPLACE_EXISTING);
+ brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+ // Demand on local queue should trigger receiver on remote.
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.toSimpleString("queue1")).isExists());
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.toSimpleString("queue2")).isExists());
+
+ // Send another message to the originally federated queue
+ queue1Producer.send(message);
+
+ // Should arrive on the federated Queues now that the broker
configuration has been reloaded.
+ Wait.assertTrue(() ->
+
embeddedActiveMQ.getActiveMQServer().queueQuery(SimpleString.toSimpleString("queue1")).getMessageCount()
== 1);
+ Wait.assertTrue(() ->
+
embeddedActiveMQ.getActiveMQServer().queueQuery(SimpleString.toSimpleString("queue2")).getMessageCount()
== 1);
+
+ assertNotNull(queue1Consumer.receiveNoWait());
+ assertNotNull(queue2Consumer.receiveNoWait());
+
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index ee7649d728..17e04fb675 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -224,6 +224,7 @@ public class AMQPFederationConnectTest extends
AmqpClientTestSupport {
peer.remoteDetach().withErrorCondition("amqp:unauthorized-access",
"Not authroized").queue();
peer.expectDetach().optional();
peer.expectClose().optional();
+ peer.expectConnectionToDrop();
// Broker reconnect and allow it to attach this time.
peer.expectSASLAnonymousConnect("PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
index b1f34fe8c7..68242342a3 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
@@ -947,6 +947,7 @@ public class AMQPFederationQueuePolicyTest extends
AmqpClientTestSupport {
peer.expectDetach().optional(); // Broker is not consistent on
sending the detach
peer.expectClose().optional();
+ peer.expectConnectionToDrop();
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java
index 0ebb794f0a..b6eeaf782a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorConnectionTest.java
@@ -22,6 +22,7 @@ import static
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TUNNE
import java.lang.invoke.MethodHandles;
import java.net.URI;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -41,6 +42,7 @@ import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker
import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import
org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import
org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
@@ -525,4 +527,168 @@ public class AMQPMirrorConnectionTest extends
AmqpClientTestSupport {
server.stop();
}
}
+
+ @Test(timeout = 20000)
+ public void testMirrorConnectionRemainsUnchangedAfterConfigurationUpdate()
throws Exception {
+ final Map<String, Object> brokerProperties = new HashMap<>();
+ brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(),
"Test-Broker");
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
+ .withDesiredCapabilities("amq.mirror",
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
+ .respond()
+ .withOfferedCapabilities("amq.mirror",
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
+ .withPropertiesMap(brokerProperties);
+ peer.remoteFlow().withLinkCredit(10).queue();
+ peer.expectTransfer().accept(); // Notification address create
+ peer.expectTransfer().accept(); // Address create
+ peer.expectTransfer().accept(); // Queue create
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+ AMQPMirrorBrokerConnectionElement mirror = new
AMQPMirrorBrokerConnectionElement();
+ mirror.setQueueCreation(true);
+ mirror.setDurable(true);
+ mirror.setName("test");
+
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.setUser("user");
+ amqpConnection.setPassword("pass");
+ amqpConnection.addElement(mirror);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + BROKER_PORT_NUM);
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageConsumer consumer = session.createConsumer(queue);
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("test");
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ AMQPMirrorBrokerConnectionElement mirrorUpdated = new
AMQPMirrorBrokerConnectionElement();
+ mirrorUpdated.setQueueCreation(true);
+ mirrorUpdated.setDurable(false);
+ mirrorUpdated.setName("test");
+
+ AMQPBrokerConnectConfiguration amqpConnectionUpdated =
+ new AMQPBrokerConnectConfiguration("testSimpleConnect",
"tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnectionUpdated.setReconnectAttempts(0);// No reconnects
+ amqpConnectionUpdated.setUser("user1");
+ amqpConnectionUpdated.setPassword("pass1");
+ amqpConnectionUpdated.addElement(mirrorUpdated);
+
+ final ProtonProtocolManagerFactory protocolFactory =
(ProtonProtocolManagerFactory)
+ server.getRemotingService().getProtocolFactoryMap().get("AMQP");
+ assertNotNull(protocolFactory);
+
+ server.getConfiguration().clearAMQPConnectionConfigurations();
+ server.getConfiguration().addAMQPConnection(amqpConnectionUpdated);
+
+ protocolFactory.updateProtocolServices(server,
Collections.emptyList());
+
+ // Should be ignored as mirror connections cannot be updated.
+
+ peer.waitForScriptToComplete();
+ peer.expectTransfer().withMessageFormat(0).accept(); // Producer
Message
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ producer.send(message);
+
+ consumer.close();
+
+ peer.waitForScriptToComplete();
+ }
+
+ server.stop();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testMirrorConnectionRemainsUnchangedAfterConfigurationRemoved()
throws Exception {
+ final Map<String, Object> brokerProperties = new HashMap<>();
+ brokerProperties.put(AMQPMirrorControllerSource.BROKER_ID.toString(),
"Test-Broker");
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLPlainConnect("user", "pass", "PLAIN", "ANONYMOUS");
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withName(Matchers.startsWith("$ACTIVEMQ_ARTEMIS_MIRROR"))
+ .withDesiredCapabilities("amq.mirror",
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
+ .respond()
+ .withOfferedCapabilities("amq.mirror",
AmqpSupport.CORE_MESSAGE_TUNNELING_SUPPORT.toString())
+ .withPropertiesMap(brokerProperties);
+ peer.remoteFlow().withLinkCredit(10).queue();
+ peer.expectTransfer().accept(); // Notification address create
+ peer.expectTransfer().accept(); // Address create
+ peer.expectTransfer().accept(); // Queue create
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+ AMQPMirrorBrokerConnectionElement mirror = new
AMQPMirrorBrokerConnectionElement();
+ mirror.setQueueCreation(true);
+ mirror.setDurable(true);
+ mirror.setName("test");
+
+ AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.setUser("user");
+ amqpConnection.setPassword("pass");
+ amqpConnection.addElement(mirror);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + BROKER_PORT_NUM);
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageConsumer consumer = session.createConsumer(queue);
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("test");
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ final ProtonProtocolManagerFactory protocolFactory =
(ProtonProtocolManagerFactory)
+ server.getRemotingService().getProtocolFactoryMap().get("AMQP");
+ assertNotNull(protocolFactory);
+
+ // Clear and update is essentially a remove of old configuration
+ server.getConfiguration().clearAMQPConnectionConfigurations();
+
+ protocolFactory.updateProtocolServices(server,
Collections.emptyList());
+
+ // Should be ignored as mirror connections cannot be updated.
+
+ peer.waitForScriptToComplete();
+ peer.expectTransfer().withMessageFormat(0).accept(); // Producer
Message
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ producer.send(message);
+
+ consumer.close();
+
+ peer.waitForScriptToComplete();
+ }
+
+ server.stop();
+ }
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index 6ac33e5806..5c7d34c7a3 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -77,6 +77,7 @@ import
org.apache.activemq.artemis.core.client.impl.ClientSessionImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
+import
org.apache.activemq.artemis.core.config.brokerConnectivity.BrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
import org.apache.activemq.artemis.core.management.impl.view.ConsumerField;
import org.apache.activemq.artemis.core.management.impl.view.ProducerField;
@@ -5923,6 +5924,11 @@ public class ActiveMQServerControlTest extends
ManagementTestBase {
public boolean isStarted() {
return started;
}
+
+ @Override
+ public BrokerConnectConfiguration getConfiguration() {
+ return null;
+ }
}
Fake fake = new Fake("fake" +
UUIDGenerator.getInstance().generateStringUUID());
server.registerBrokerConnection(fake);
diff --git
a/tests/integration-tests/src/test/resources/reload-amqp-federated-addresses-reload.xml
b/tests/integration-tests/src/test/resources/reload-amqp-federated-addresses-reload.xml
new file mode 100644
index 0000000000..19ca26c00f
--- /dev/null
+++
b/tests/integration-tests/src/test/resources/reload-amqp-federated-addresses-reload.xml
@@ -0,0 +1,88 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <security-enabled>false</security-enabled>
+ <persistence-enabled>false</persistence-enabled>
+
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+ <acceptors>
+ <acceptor
name="artemis">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
+ </acceptors>
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:5672" name="federation-test"
reconnect-attempts="-1" retry-interval="100" user="B" password="B">
+ <federation>
+ <local-address-policy name="address-policy">
+ <include address-match="address1" />
+ <include address-match="address2" />
+ </local-address-policy>
+ </federation>
+ </amqp-connection>
+ </broker-connections>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ </addresses>
+ </core>
+</configuration>
diff --git
a/tests/integration-tests/src/test/resources/reload-amqp-federated-addresses.xml
b/tests/integration-tests/src/test/resources/reload-amqp-federated-addresses.xml
new file mode 100644
index 0000000000..6cc2f2087d
--- /dev/null
+++
b/tests/integration-tests/src/test/resources/reload-amqp-federated-addresses.xml
@@ -0,0 +1,87 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <security-enabled>false</security-enabled>
+ <persistence-enabled>false</persistence-enabled>
+
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+ <acceptors>
+ <acceptor
name="artemis">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
+ </acceptors>
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:5672" name="federation-test"
reconnect-attempts="-1" retry-interval="100" user="B" password="B">
+ <federation>
+ <local-address-policy name="address-policy">
+ <include address-match="address1" />
+ </local-address-policy>
+ </federation>
+ </amqp-connection>
+ </broker-connections>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ </addresses>
+ </core>
+</configuration>
diff --git
a/tests/integration-tests/src/test/resources/reload-amqp-federated-basic.xml
b/tests/integration-tests/src/test/resources/reload-amqp-federated-basic.xml
new file mode 100644
index 0000000000..38f6f77992
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-amqp-federated-basic.xml
@@ -0,0 +1,77 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <security-enabled>false</security-enabled>
+ <persistence-enabled>false</persistence-enabled>
+
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+ <acceptors>
+ <acceptor
name="artemis">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
+ </acceptors>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ </addresses>
+ </core>
+</configuration>
diff --git
a/tests/integration-tests/src/test/resources/reload-amqp-federated-queues-reload.properties
b/tests/integration-tests/src/test/resources/reload-amqp-federated-queues-reload.properties
new file mode 100644
index 0000000000..b529fe2721
--- /dev/null
+++
b/tests/integration-tests/src/test/resources/reload-amqp-federated-queues-reload.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+AMQPConnections.target.uri=tcp://localhost:5672
+AMQPConnections.target.retryInterval=100
+AMQPConnections.target.reconnectAttempts=-1
+AMQPConnections.target.user=user
+AMQPConnections.target.password=password
+AMQPConnections.target.autostart=true
+AMQPConnections.target.federations.abc.type=FEDERATION
+AMQPConnections.target.federations.abc.localQueuePolicies.policy1.includes.m1.addressMatch=#
+AMQPConnections.target.federations.abc.localQueuePolicies.policy1.includes.m1.queueMatch=queue1
+AMQPConnections.target.federations.abc.localQueuePolicies.policy1.includes.m2.addressMatch=#
+AMQPConnections.target.federations.abc.localQueuePolicies.policy1.includes.m2.queueMatch=queue2
diff --git
a/tests/integration-tests/src/test/resources/reload-amqp-federated-queues-reload.xml
b/tests/integration-tests/src/test/resources/reload-amqp-federated-queues-reload.xml
new file mode 100644
index 0000000000..e2588babdb
--- /dev/null
+++
b/tests/integration-tests/src/test/resources/reload-amqp-federated-queues-reload.xml
@@ -0,0 +1,88 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <security-enabled>false</security-enabled>
+ <persistence-enabled>false</persistence-enabled>
+
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+ <acceptors>
+ <acceptor
name="artemis">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
+ </acceptors>
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:5672" name="federation-test"
reconnect-attempts="-1" retry-interval="100" user="B" password="B">
+ <federation>
+ <local-queue-policy name="queue-policy">
+ <include address-match="*" queue-match="queue1" />
+ <include address-match="*" queue-match="queue2" />
+ </local-queue-policy>
+ </federation>
+ </amqp-connection>
+ </broker-connections>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ </addresses>
+ </core>
+</configuration>
diff --git
a/tests/integration-tests/src/test/resources/reload-amqp-federated-queues.properties
b/tests/integration-tests/src/test/resources/reload-amqp-federated-queues.properties
new file mode 100644
index 0000000000..717417af51
--- /dev/null
+++
b/tests/integration-tests/src/test/resources/reload-amqp-federated-queues.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+AMQPConnections.target.uri=tcp://localhost:5672
+AMQPConnections.target.retryInterval=100
+AMQPConnections.target.reconnectAttempts=-1
+AMQPConnections.target.user=user
+AMQPConnections.target.password=password
+AMQPConnections.target.autostart=true
+AMQPConnections.target.federations.abc.type=FEDERATION
+AMQPConnections.target.federations.abc.localQueuePolicies.policy1.includes.m1.addressMatch=#
+AMQPConnections.target.federations.abc.localQueuePolicies.policy1.includes.m1.queueMatch=queue1
diff --git
a/tests/integration-tests/src/test/resources/reload-amqp-federated-queues.xml
b/tests/integration-tests/src/test/resources/reload-amqp-federated-queues.xml
new file mode 100644
index 0000000000..0759c6eb53
--- /dev/null
+++
b/tests/integration-tests/src/test/resources/reload-amqp-federated-queues.xml
@@ -0,0 +1,87 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <security-enabled>false</security-enabled>
+ <persistence-enabled>false</persistence-enabled>
+
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+ <acceptors>
+ <acceptor
name="artemis">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
+ </acceptors>
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:5672" name="federation-test"
reconnect-attempts="-1" retry-interval="100" user="B" password="B">
+ <federation>
+ <local-queue-policy name="queue-policy">
+ <include address-match="*" queue-match="queue1" />
+ </local-queue-policy>
+ </federation>
+ </amqp-connection>
+ </broker-connections>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ </addresses>
+ </core>
+</configuration>