This is an automated email from the ASF dual-hosted git repository.
brusdev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 4b717d1634 ARTEMIS-5889 Support reload of connection router
configuration
4b717d1634 is described below
commit 4b717d163432a36ec940ff6960f4ee2c8fc101ea
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Feb 19 17:53:45 2026 -0500
ARTEMIS-5889 Support reload of connection router configuration
Support connection routers being added, updated or removed via
configuration updates either XML or via broker properties.
---
.../core/config/routing/CacheConfiguration.java | 22 +-
.../routing/ConnectionRouterConfiguration.java | 25 ++
.../config/routing/NamedPropertyConfiguration.java | 20 ++
.../core/config/routing/PoolConfiguration.java | 37 +-
.../core/server/impl/ActiveMQServerImpl.java | 7 +-
.../core/server/routing/ConnectionRouter.java | 170 ++++++----
.../server/routing/ConnectionRouterManager.java | 119 +++++--
.../routing/ConnectionRouterManagerTest.java | 375 ++++++++++++++++++++-
.../core/server/routing/ConnectionRouterTest.java | 37 +-
docs/user-manual/config-reload.adoc | 3 +
.../tests/integration/jms/RedeployTest.java | 303 ++++++++++++++++-
.../resources/reload-connection-router-updated.xml | 47 +++
.../test/resources/reload-connection-router.xml | 44 +++
13 files changed, 1103 insertions(+), 106 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/CacheConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/CacheConfiguration.java
index ba06ed2139..7566aeb551 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/CacheConfiguration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/CacheConfiguration.java
@@ -17,9 +17,11 @@
package org.apache.activemq.artemis.core.config.routing;
import java.io.Serializable;
+import java.util.Objects;
public class CacheConfiguration implements Serializable {
- private boolean persisted = false;
+
+ private boolean persisted;
private int timeout = 0;
@@ -43,4 +45,22 @@ public class CacheConfiguration implements Serializable {
this.timeout = timeout;
return this;
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(persisted, timeout);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj instanceof CacheConfiguration other) {
+ return persisted == other.persisted && timeout == other.timeout;
+ }
+
+ return false;
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/ConnectionRouterConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/ConnectionRouterConfiguration.java
index 2b6af82ac1..71968e0d78 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/ConnectionRouterConfiguration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/ConnectionRouterConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.config.routing;
import org.apache.activemq.artemis.core.server.routing.KeyType;
import java.io.Serializable;
+import java.util.Objects;
public class ConnectionRouterConfiguration implements Serializable {
@@ -92,4 +93,28 @@ public class ConnectionRouterConfiguration implements
Serializable {
this.poolConfiguration = poolConfiguration;
return this;
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(cacheConfiguration, keyFilter, keyType,
localTargetFilter, name, policyConfiguration, poolConfiguration);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj instanceof ConnectionRouterConfiguration other) {
+ return Objects.equals(cacheConfiguration, other.cacheConfiguration) &&
+ Objects.equals(keyFilter, other.keyFilter) &&
+ keyType == other.keyType &&
+ Objects.equals(localTargetFilter, other.localTargetFilter) &&
+ Objects.equals(name, other.name) &&
+ Objects.equals(policyConfiguration, other.policyConfiguration)
&&
+ Objects.equals(poolConfiguration, other.poolConfiguration);
+ }
+
+ return false;
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/NamedPropertyConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/NamedPropertyConfiguration.java
index 96fc9d3126..c8af5f1148 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/NamedPropertyConfiguration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/NamedPropertyConfiguration.java
@@ -18,8 +18,10 @@ package org.apache.activemq.artemis.core.config.routing;
import java.io.Serializable;
import java.util.Map;
+import java.util.Objects;
public class NamedPropertyConfiguration implements Serializable {
+
private String name;
private Map<String, String> properties;
@@ -41,4 +43,22 @@ public class NamedPropertyConfiguration implements
Serializable {
this.properties = properties;
return this;
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, properties);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj instanceof NamedPropertyConfiguration other) {
+ return Objects.equals(name, other.name) && Objects.equals(properties,
other.properties);
+ }
+
+ return false;
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/PoolConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/PoolConfiguration.java
index 285b5b1130..d6a55e6e85 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/PoolConfiguration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/routing/PoolConfiguration.java
@@ -18,19 +18,21 @@ package org.apache.activemq.artemis.core.config.routing;
import java.io.Serializable;
import java.util.List;
+import java.util.Objects;
public class PoolConfiguration implements Serializable {
+
private String username;
private String password;
- private boolean localTargetEnabled = false;
+ private boolean localTargetEnabled;
- private String clusterConnection = null;
+ private String clusterConnection;
- private List<String> staticConnectors = null;
+ private List<String> staticConnectors;
- private String discoveryGroupName = null;
+ private String discoveryGroupName;
private int checkPeriod = 5000;
@@ -118,4 +120,31 @@ public class PoolConfiguration implements Serializable {
this.discoveryGroupName = discoveryGroupName;
return this;
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(checkPeriod, clusterConnection, discoveryGroupName,
localTargetEnabled, password, quorumSize, quorumTimeout, staticConnectors,
+ username);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj instanceof PoolConfiguration other) {
+ return checkPeriod == other.checkPeriod &&
+ Objects.equals(clusterConnection, other.clusterConnection) &&
+ Objects.equals(discoveryGroupName, other.discoveryGroupName) &&
+ localTargetEnabled == other.localTargetEnabled &&
+ Objects.equals(password, other.password) &&
+ quorumSize == other.quorumSize &&
+ quorumTimeout == other.quorumTimeout &&
+ Objects.equals(staticConnectors, other.staticConnectors) &&
+ Objects.equals(username, other.username);
+ }
+
+ return false;
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index a9aa48b765..e687c8de5b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -3398,9 +3398,9 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
federationManager.deploy();
- connectionRouterManager = new ConnectionRouterManager(configuration,
this, scheduledPool);
+ connectionRouterManager = new ConnectionRouterManager(this,
scheduledPool);
- connectionRouterManager.deploy();
+ connectionRouterManager.deploy(configuration);
remotingService = new RemotingServiceImpl(clusterManager, configuration,
this, managementService, scheduledPool, protocolManagerFactories,
executorFactory.getExecutor(), serviceRegistry);
@@ -4837,6 +4837,9 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
recoverStoredConnectors();
+ ActiveMQServerLogger.LOGGER.reloadingConfiguration("connection
routers");
+ connectionRouterManager.update(configuration);
+
ActiveMQServerLogger.LOGGER.reloadingConfiguration("protocol
services");
updateProtocolServices();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java
index bacd96fbfe..2fdeb9d40e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouter.java
@@ -30,11 +30,12 @@ import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
public class ConnectionRouter implements ActiveMQComponent {
- private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String CLIENT_ID_PREFIX =
ActiveMQDefaultConfiguration.DEFAULT_INTERNAL_NAMING_PREFIX + "router.client.";
@@ -54,7 +55,9 @@ public class ConnectionRouter implements ActiveMQComponent {
private final Cache cache;
- private volatile boolean started = false;
+ private volatile boolean started;
+
+ private final ReentrantReadWriteLock stateLock = new
ReentrantReadWriteLock();
public String getName() {
return name;
@@ -73,6 +76,8 @@ public class ConnectionRouter implements ActiveMQComponent {
}
public String getLocalTargetFilter() {
+ final Pattern localTargetFilter = this.localTargetFilter;
+
return localTargetFilter != null ? localTargetFilter.pattern() : null;
}
@@ -101,7 +106,6 @@ public class ConnectionRouter implements ActiveMQComponent {
return started;
}
-
public ConnectionRouter(final String name,
final KeyType keyType,
final String targetKeyFilter,
@@ -129,108 +133,138 @@ public class ConnectionRouter implements
ActiveMQComponent {
@Override
public void start() throws Exception {
- if (localTarget != null) {
- localTarget.getTarget().connect();
- }
+ stateLock.writeLock().lock();
+ try {
+ if (localTarget != null) {
+ localTarget.getTarget().connect();
+ }
- if (cache != null) {
- cache.start();
- }
+ if (cache != null) {
+ cache.start();
+ }
- if (pool != null) {
- pool.start();
- }
+ if (pool != null) {
+ pool.start();
+ }
- started = true;
+ started = true;
+ } finally {
+ stateLock.writeLock().unlock();
+ }
}
@Override
public void stop() throws Exception {
- started = false;
+ stateLock.writeLock().lock();
+ try {
+ started = false;
- if (pool != null) {
- pool.stop();
- }
+ if (pool != null) {
+ pool.stop();
+ }
- if (cache != null) {
- cache.stop();
- }
+ if (cache != null) {
+ cache.stop();
+ }
- if (localTarget != null) {
- localTarget.getTarget().disconnect();
+ if (localTarget != null) {
+ localTarget.getTarget().disconnect();
+ }
+ } finally {
+ stateLock.writeLock().unlock();
}
}
public TargetResult getTarget(Connection connection, String clientID,
String username) {
- if (clientID != null &&
clientID.startsWith(ConnectionRouter.CLIENT_ID_PREFIX)) {
- logger.debug("The clientID [{}] starts with
ConnectionRouter.CLIENT_ID_PREFIX", clientID);
+ stateLock.readLock().lock();
+ try {
+ if (!started) {
+ return TargetResult.REFUSED_UNAVAILABLE_RESULT;
+ }
- return localTarget;
- }
+ if (clientID != null &&
clientID.startsWith(ConnectionRouter.CLIENT_ID_PREFIX)) {
+ logger.debug("The clientID [{}] starts with
ConnectionRouter.CLIENT_ID_PREFIX", clientID);
+
+ return localTarget;
+ }
- return getTarget(keyResolver.resolve(connection, clientID, username));
+ return getTarget(keyResolver.resolve(connection, clientID, username));
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
public TargetResult getTarget(String key) {
- if (policy != null && !KeyResolver.NULL_KEY_VALUE.equals(key)) {
- key = policy.transformKey(key);
- }
+ stateLock.readLock().lock();
+ try {
+ if (!started) {
+ return TargetResult.REFUSED_UNAVAILABLE_RESULT;
+ }
- if (this.localTargetFilter != null &&
this.localTargetFilter.matcher(key).matches()) {
- if (logger.isDebugEnabled()) {
- logger.debug("The {}[{}] matches the localTargetFilter {}",
keyType, key, localTargetFilter.pattern());
+ if (policy != null && !KeyResolver.NULL_KEY_VALUE.equals(key)) {
+ key = policy.transformKey(key);
}
- return localTarget;
- }
+ final Pattern localTargetFilter = this.localTargetFilter;
- if (policy == null || pool == null) {
- return TargetResult.REFUSED_USE_ANOTHER_RESULT;
- }
-
- TargetResult result = null;
+ if (localTargetFilter != null &&
localTargetFilter.matcher(key).matches()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("The {}[{}] matches the localTargetFilter {}",
keyType, key, localTargetFilter.pattern());
+ }
- if (cache != null) {
- final String nodeId = cache.get(key);
+ return localTarget;
+ }
- if (logger.isDebugEnabled()) {
- logger.debug("The cache returns target [{}] for {}[{}]", nodeId,
keyType, key);
+ if (policy == null || pool == null) {
+ return TargetResult.REFUSED_USE_ANOTHER_RESULT;
}
- if (nodeId != null) {
- Target target = pool.getReadyTarget(nodeId);
- if (target != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("The target [{}] is ready for {}[{}]", nodeId,
keyType, key);
- }
+ TargetResult result = null;
- return new TargetResult(target);
- }
+ if (cache != null) {
+ final String nodeId = cache.get(key);
if (logger.isDebugEnabled()) {
- logger.debug("The target [{}] is not ready for {}[{}]", nodeId,
keyType, key);
+ logger.debug("The cache returns target [{}] for {}[{}]",
nodeId, keyType, key);
}
- }
- }
- final List<Target> targets = pool.getTargets();
- final Target target = policy.selectTarget(targets, key);
+ if (nodeId != null) {
+ Target target = pool.getReadyTarget(nodeId);
+ if (target != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("The target [{}] is ready for {}[{}]",
nodeId, keyType, key);
+ }
- if (logger.isDebugEnabled()) {
- logger.debug("The policy selects [{}] from {} for {}[{}]", target,
targets, keyType, key);
- }
+ return new TargetResult(target);
+ }
- if (target != null) {
- result = new TargetResult(target);
- if (cache != null) {
- if (logger.isDebugEnabled()) {
- logger.debug("Caching {}[{}] for [{}]", keyType, key, target);
+ if (logger.isDebugEnabled()) {
+ logger.debug("The target [{}] is not ready for {}[{}]",
nodeId, keyType, key);
+ }
}
+ }
- cache.put(key, target.getNodeID());
+ final List<Target> targets = pool.getTargets();
+ final Target target = policy.selectTarget(targets, key);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("The policy selects [{}] from {} for {}[{}]", target,
targets, keyType, key);
}
- }
- return Objects.requireNonNullElse(result,
TargetResult.REFUSED_UNAVAILABLE_RESULT);
+ if (target != null) {
+ result = new TargetResult(target);
+ if (cache != null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Caching {}[{}] for [{}]", keyType, key,
target);
+ }
+
+ cache.put(key, target.getNodeID());
+ }
+ }
+
+ return Objects.requireNonNullElse(result,
TargetResult.REFUSED_UNAVAILABLE_RESULT);
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java
index 1e7c3de5a4..ff6d5ac5d7 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManager.java
@@ -46,52 +46,107 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
public final class ConnectionRouterManager implements ActiveMQComponent {
+
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String CACHE_ID_PREFIX = "$.BC.";
-
- private final Configuration config;
-
private final ActiveMQServer server;
private final ScheduledExecutorService scheduledExecutor;
private volatile boolean started = false;
- private Map<String, ConnectionRouter> connectionRouters = new HashMap<>();
+ private final ReentrantReadWriteLock stateLock = new
ReentrantReadWriteLock();
+ private Map<String, ConnectionRouterConfiguration> configurations = new
HashMap<>();
+ private Map<String, ConnectionRouter> connectionRouters = new HashMap<>();
@Override
public boolean isStarted() {
return started;
}
-
- public ConnectionRouterManager(final Configuration config, final
ActiveMQServer server, ScheduledExecutorService scheduledExecutor) {
- this.config = config;
+ public ConnectionRouterManager(final ActiveMQServer server,
ScheduledExecutorService scheduledExecutor) {
this.server = server;
this.scheduledExecutor = scheduledExecutor;
}
- public void deploy() throws Exception {
- for (ConnectionRouterConfiguration connectionRouterConfig :
config.getConnectionRouters()) {
- deployConnectionRouter(connectionRouterConfig);
+ public void deploy(Configuration config) throws Exception {
+ stateLock.writeLock().lock();
+ try {
+ for (ConnectionRouterConfiguration connectionRouterConfig :
config.getConnectionRouters()) {
+ deployConnectionRouter(connectionRouterConfig);
+ }
+ } finally {
+ stateLock.writeLock().unlock();
}
}
- public void deployConnectionRouter(ConnectionRouterConfiguration config)
throws Exception {
+ public void update(Configuration config) throws Exception {
+ stateLock.writeLock().lock();
+ try {
+ final List<ConnectionRouterConfiguration> activeConfiguration =
Objects.requireNonNullElse(config.getConnectionRouters(),
Collections.emptyList());
+
+ for (ConnectionRouterConfiguration configuration :
activeConfiguration) {
+ final ConnectionRouterConfiguration previous =
configurations.get(configuration.getName());
+
+ if (previous == null || !configuration.equals(previous)) {
+ // If this was an update and the connection router is active
meaning the manager is
+ // started then we need to stop the old one if it exists before
attempting to deploy
+ // a new version with the updated configuration.
+ final ConnectionRouter router =
connectionRouters.remove(configuration.getName());
+
+ if (router != null) {
+ router.stop();
+
server.getManagementService().unregisterConnectionRouter(router.getName());
+ }
+
+ deployConnectionRouter(configuration);
+ }
+ }
+
+ // Find any removed configurations and remove them from the current
set and stop the associated
+ // router if one is present.
+
+ final Map<String, ConnectionRouterConfiguration>
activeConfigurationsMap =
+ activeConfiguration.stream()
+ .collect(Collectors.toMap(c -> c.getName(),
Function.identity()));
+
+ final List<ConnectionRouterConfiguration> removedConfigurations =
+ configurations.values().stream().filter(c ->
!activeConfigurationsMap.containsKey(c.getName())).toList();
+
+ for (ConnectionRouterConfiguration removedConfiguration :
removedConfigurations) {
+ configurations.remove(removedConfiguration.getName());
+
+ final ConnectionRouter router =
connectionRouters.remove(removedConfiguration.getName());
+
+ if (router != null) {
+ router.stop();
+
server.getManagementService().unregisterConnectionRouter(router.getName());
+ }
+ }
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ ConnectionRouter deployConnectionRouter(ConnectionRouterConfiguration
config) throws Exception {
logger.debug("Deploying ConnectionRouter {}", config.getName());
Target localTarget = new LocalTarget(null, server);
-
Cache cache = null;
CacheConfiguration cacheConfiguration = config.getCacheConfiguration();
if (cacheConfiguration != null) {
@@ -113,9 +168,16 @@ public final class ConnectionRouterManager implements
ActiveMQComponent {
ConnectionRouter connectionRouter = new
ConnectionRouter(config.getName(), config.getKeyType(),
config.getKeyFilter(), localTarget, config.getLocalTargetFilter(),
cache, pool, policy);
+ configurations.put(connectionRouter.getName(), config);
connectionRouters.put(connectionRouter.getName(), connectionRouter);
server.getManagementService().registerConnectionRouter(connectionRouter);
+
+ if (isStarted()) {
+ connectionRouter.start();
+ }
+
+ return connectionRouter;
}
private Cache deployCache(CacheConfiguration configuration, String name)
throws ClassNotFoundException {
@@ -191,25 +253,40 @@ public final class ConnectionRouterManager implements
ActiveMQComponent {
}
public ConnectionRouter getRouter(String name) {
- return connectionRouters.get(name);
+ stateLock.readLock().lock();
+ try {
+ return connectionRouters.get(name);
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
@Override
public void start() throws Exception {
- for (ConnectionRouter connectionRouter : connectionRouters.values()) {
- connectionRouter.start();
- }
+ stateLock.writeLock().lock();
+ try {
+ started = true;
- started = true;
+ for (ConnectionRouter connectionRouter : connectionRouters.values()) {
+ connectionRouter.start();
+ }
+ } finally {
+ stateLock.writeLock().unlock();
+ }
}
@Override
public void stop() throws Exception {
- started = false;
+ stateLock.writeLock().lock();
+ try {
+ started = false;
- for (ConnectionRouter connectionRouter : connectionRouters.values()) {
- connectionRouter.stop();
-
server.getManagementService().unregisterConnectionRouter(connectionRouter.getName());
+ for (ConnectionRouter connectionRouter : connectionRouters.values()) {
+ connectionRouter.stop();
+
server.getManagementService().unregisterConnectionRouter(connectionRouter.getName());
+ }
+ } finally {
+ stateLock.writeLock().unlock();
}
}
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java
index cdc4f1e4e4..4fdcd6ee2b 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterManagerTest.java
@@ -16,17 +16,30 @@
*/
package org.apache.activemq.artemis.core.server.routing;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.routing.CacheConfiguration;
import
org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import
org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.routing.PoolConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import
org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashModuloPolicy;
import
org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashPolicy;
+import org.apache.activemq.artemis.core.server.routing.pools.Pool;
+import org.jgroups.util.UUID;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -42,8 +55,9 @@ public class ConnectionRouterManagerTest {
public void setUp() throws Exception {
mockServer = mock(ActiveMQServer.class);
+
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.of(UUID.randomUUID().toString()));
- underTest = new ConnectionRouterManager(null, mockServer, null);
+ underTest = new ConnectionRouterManager(mockServer, null);
underTest.start();
}
@@ -81,7 +95,35 @@ public class ConnectionRouterManagerTest {
ConnectionRouterConfiguration connectionRouterConfiguration = new
ConnectionRouterConfiguration();
connectionRouterConfiguration.setName("partition-local-pool");
- underTest.deployConnectionRouter(connectionRouterConfiguration);
+ final ConnectionRouter router =
underTest.deployConnectionRouter(connectionRouterConfiguration);
+
+ assertNotNull(router);
+ assertTrue(router.isStarted());
+
+ underTest.stop();
+
+ assertFalse(router.isStarted());
+ }
+
+ @Test
+ public void deployLocalOnlyToUnstartedManager() throws Exception {
+
+ ManagementService mockManagementService =
Mockito.mock(ManagementService.class);
+
Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService);
+
+ ConnectionRouterConfiguration connectionRouterConfiguration = new
ConnectionRouterConfiguration();
+ connectionRouterConfiguration.setName("partition-local-pool");
+
+ underTest.stop();
+
+ final ConnectionRouter router =
underTest.deployConnectionRouter(connectionRouterConfiguration);
+
+ assertNotNull(router);
+ assertFalse(router.isStarted());
+
+ underTest.start();
+
+ assertTrue(router.isStarted());
}
@Test
@@ -97,7 +139,6 @@ public class ConnectionRouterManagerTest {
.setProperties(Collections.singletonMap(ConsistentHashModuloPolicy.MODULO,
String.valueOf(2)));
connectionRouterConfiguration.setPolicyConfiguration(policyConfig);
-
underTest.deployConnectionRouter(connectionRouterConfiguration);
}
@@ -113,9 +154,333 @@ public class ConnectionRouterManagerTest {
.setName(ConsistentHashModuloPolicy.NAME)
.setProperties(Collections.singletonMap(ConsistentHashModuloPolicy.MODULO,
String.valueOf(2)));
connectionRouterConfiguration.setPolicyConfiguration(policyConfig);
- underTest.deployConnectionRouter(connectionRouterConfiguration);
+ final ConnectionRouter router1 =
underTest.deployConnectionRouter(connectionRouterConfiguration);
connectionRouterConfiguration.setName("partition-local-consistent-hash-bis");
- underTest.deployConnectionRouter(connectionRouterConfiguration);
+ final ConnectionRouter router2 =
underTest.deployConnectionRouter(connectionRouterConfiguration);
+
+ assertEquals("partition-local-consistent-hash", router1.getName());
+ assertEquals("partition-local-consistent-hash-bis", router2.getName());
+
+ assertTrue(router1.isStarted());
+ assertTrue(router2.isStarted());
+ }
+
+ @Test
+ public void deployPolicyAndRemoveWithUpdate() throws Exception {
+
+ ManagementService mockManagementService =
Mockito.mock(ManagementService.class);
+
Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService);
+
+ List<ConnectionRouterConfiguration> configurations = new ArrayList<>();
+
+ Configuration mockConfiguration = Mockito.mock(Configuration.class);
+
Mockito.when(mockConfiguration.getConnectionRouters()).thenReturn(configurations);
+
+ ConnectionRouterConfiguration connectionRouterConfiguration = new
ConnectionRouterConfiguration();
+
connectionRouterConfiguration.setName("partition-local-consistent-hash-1").setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(String.valueOf(2));
+ NamedPropertyConfiguration policyConfig1 = new
NamedPropertyConfiguration()
+ .setName(ConsistentHashModuloPolicy.NAME)
+
.setProperties(Collections.singletonMap(ConsistentHashModuloPolicy.MODULO,
String.valueOf(2)));
+ connectionRouterConfiguration.setPolicyConfiguration(policyConfig1);
+
+ configurations.add(connectionRouterConfiguration);
+
+ underTest.deploy(mockConfiguration);
+
+ Mockito.verify(mockManagementService,
Mockito.times(1)).registerConnectionRouter(Mockito.any());
+ Mockito.verifyNoMoreInteractions(mockManagementService);
+
+ configurations.clear();
+
+ underTest.update(mockConfiguration);
+
+ Mockito.verify(mockManagementService,
Mockito.times(1)).unregisterConnectionRouter(connectionRouterConfiguration.getName());
+ Mockito.verifyNoMoreInteractions(mockManagementService);
+
+ underTest.stop();
+
+ Mockito.verifyNoMoreInteractions(mockManagementService);
+ }
+
+ @Test
+ public void deployPolicyAndAddNewPolicyWithUpdate() throws Exception {
+
+ ManagementService mockManagementService =
Mockito.mock(ManagementService.class);
+
Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService);
+
+ List<ConnectionRouterConfiguration> configurations = new ArrayList<>();
+
+ Configuration mockConfiguration = Mockito.mock(Configuration.class);
+
Mockito.when(mockConfiguration.getConnectionRouters()).thenReturn(configurations);
+
+ ConnectionRouterConfiguration connectionRouterConfiguration1 = new
ConnectionRouterConfiguration();
+
connectionRouterConfiguration1.setName("partition-local-consistent-hash-1").setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(String.valueOf(2));
+ NamedPropertyConfiguration policyConfig1 = new
NamedPropertyConfiguration()
+ .setName(ConsistentHashModuloPolicy.NAME)
+
.setProperties(Collections.singletonMap(ConsistentHashModuloPolicy.MODULO,
String.valueOf(2)));
+ connectionRouterConfiguration1.setPolicyConfiguration(policyConfig1);
+
+ configurations.add(connectionRouterConfiguration1);
+
+ underTest.deploy(mockConfiguration);
+
+ Mockito.verify(mockManagementService,
Mockito.times(1)).registerConnectionRouter(Mockito.any());
+ Mockito.verifyNoMoreInteractions(mockManagementService);
+ Mockito.clearInvocations(mockManagementService);
+
+ ConnectionRouterConfiguration connectionRouterConfiguration2 = new
ConnectionRouterConfiguration();
+
connectionRouterConfiguration2.setName("partition-local-consistent-hash-2").setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(String.valueOf(2));
+ NamedPropertyConfiguration policyConfig2 = new
NamedPropertyConfiguration()
+ .setName(ConsistentHashModuloPolicy.NAME)
+
.setProperties(Collections.singletonMap(ConsistentHashModuloPolicy.MODULO,
String.valueOf(2)));
+ connectionRouterConfiguration2.setPolicyConfiguration(policyConfig2);
+
+ configurations.add(connectionRouterConfiguration2);
+
+ underTest.update(mockConfiguration);
+
+ Mockito.verify(mockManagementService,
Mockito.times(1)).registerConnectionRouter(Mockito.any());
+ Mockito.verifyNoMoreInteractions(mockManagementService);
+
+ assertNotNull(underTest.getRouter("partition-local-consistent-hash-1"));
+ assertNotNull(underTest.getRouter("partition-local-consistent-hash-2"));
+
+ final ConnectionRouter router1 =
underTest.getRouter("partition-local-consistent-hash-1");
+ final ConnectionRouter router2 =
underTest.getRouter("partition-local-consistent-hash-2");
+
+ assertTrue(router1.isStarted());
+ assertTrue(router2.isStarted());
+
+ underTest.start();
+
+ assertTrue(router1.isStarted());
+ assertTrue(router2.isStarted());
+ assertNotSame(router1, router2);
+
+ underTest.stop();
+
+ assertFalse(router1.isStarted());
+ assertFalse(router2.isStarted());
+
+ Mockito.verify(mockManagementService,
Mockito.times(1)).unregisterConnectionRouter(connectionRouterConfiguration1.getName());
+ Mockito.verify(mockManagementService,
Mockito.times(1)).unregisterConnectionRouter(connectionRouterConfiguration2.getName());
+ }
+
+ @Test
+ public void deployPolicyAndUpdateIt() throws Exception {
+
+ ManagementService mockManagementService =
Mockito.mock(ManagementService.class);
+
Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService);
+
+ List<ConnectionRouterConfiguration> configurations = new ArrayList<>();
+
+ Configuration mockConfiguration = Mockito.mock(Configuration.class);
+
Mockito.when(mockConfiguration.getConnectionRouters()).thenReturn(configurations);
+
+ ConnectionRouterConfiguration connectionRouterConfiguration = new
ConnectionRouterConfiguration();
+
connectionRouterConfiguration.setName("partition-local-consistent-hash-1").setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(String.valueOf(2));
+ NamedPropertyConfiguration policyConfig = new
NamedPropertyConfiguration()
+ .setName(ConsistentHashModuloPolicy.NAME)
+
.setProperties(Collections.singletonMap(ConsistentHashModuloPolicy.MODULO,
String.valueOf(2)));
+ connectionRouterConfiguration.setPolicyConfiguration(policyConfig);
+
+ configurations.add(connectionRouterConfiguration);
+
+ underTest.deploy(mockConfiguration);
+
+ Mockito.verify(mockManagementService,
Mockito.times(1)).registerConnectionRouter(Mockito.any());
+ Mockito.verifyNoMoreInteractions(mockManagementService);
+ Mockito.clearInvocations(mockManagementService);
+
+ final ConnectionRouter router =
underTest.getRouter("partition-local-consistent-hash-1");
+
+ assertEquals("2",
router.getPolicy().getProperties().get(ConsistentHashModuloPolicy.MODULO));
+
+ // Modify some configuration but use the same router name, this should
remove the previous one
+ // and add back a new instance
+ ConnectionRouterConfiguration connectionRouterConfigurationUpdate = new
ConnectionRouterConfiguration();
+
connectionRouterConfigurationUpdate.setName("partition-local-consistent-hash-1").setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(String.valueOf(2));
+ NamedPropertyConfiguration policyConfigUpdated = new
NamedPropertyConfiguration()
+ .setName(ConsistentHashModuloPolicy.NAME)
+
.setProperties(Collections.singletonMap(ConsistentHashModuloPolicy.MODULO,
String.valueOf(3)));
+
connectionRouterConfigurationUpdate.setPolicyConfiguration(policyConfigUpdated);
+
+ configurations.clear();
+ configurations.add(connectionRouterConfigurationUpdate);
+
+ underTest.update(mockConfiguration);
+
+ Mockito.verify(mockManagementService,
Mockito.times(1)).registerConnectionRouter(Mockito.any());
+ Mockito.verify(mockManagementService,
Mockito.times(1)).unregisterConnectionRouter(connectionRouterConfiguration.getName());
+ Mockito.clearInvocations(mockManagementService);
+
+ final ConnectionRouter updated =
underTest.getRouter("partition-local-consistent-hash-1");
+
+ assertEquals("3",
updated.getPolicy().getProperties().get(ConsistentHashModuloPolicy.MODULO));
+ assertNull(updated.getCache());
+ assertNotSame(updated, router);
+
+ // Modify additional configuration
+ ConnectionRouterConfiguration connectionRouterConfigurationUpdate2 = new
ConnectionRouterConfiguration();
+
connectionRouterConfigurationUpdate2.setName("partition-local-consistent-hash-1").setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(String.valueOf(2));
+ NamedPropertyConfiguration policyConfigUpdated2 = new
NamedPropertyConfiguration()
+ .setName(ConsistentHashModuloPolicy.NAME)
+
.setProperties(Collections.singletonMap(ConsistentHashModuloPolicy.MODULO,
String.valueOf(3)));
+
connectionRouterConfigurationUpdate2.setPolicyConfiguration(policyConfigUpdated2);
+ CacheConfiguration cacheConfig2 = new CacheConfiguration()
+ .setPersisted(false)
+ .setTimeout(5_000);
+ connectionRouterConfigurationUpdate2.setCacheConfiguration(cacheConfig2);
+
+ configurations.clear();
+ configurations.add(connectionRouterConfigurationUpdate2);
+
+ underTest.update(mockConfiguration);
+
+ Mockito.verify(mockManagementService,
Mockito.times(1)).registerConnectionRouter(Mockito.any());
+ Mockito.verify(mockManagementService,
Mockito.times(1)).unregisterConnectionRouter(connectionRouterConfiguration.getName());
+
+ final ConnectionRouter updated2 =
underTest.getRouter("partition-local-consistent-hash-1");
+
+ assertEquals("3",
updated2.getPolicy().getProperties().get(ConsistentHashModuloPolicy.MODULO));
+ assertNotNull(updated2.getCache());
+
+ assertNotSame(updated2, router);
+ assertNotSame(updated2, updated);
+
+ underTest.stop();
+
+ Mockito.verify(mockManagementService,
Mockito.times(2)).unregisterConnectionRouter(connectionRouterConfiguration.getName());
+ Mockito.verifyNoMoreInteractions(mockManagementService);
+ }
+
+ @Test
+ public void deployedPolicyNotUpdatedOnUpdate() throws Exception {
+
+ ManagementService mockManagementService =
Mockito.mock(ManagementService.class);
+
Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService);
+
+ List<ConnectionRouterConfiguration> configurations = new ArrayList<>();
+
+ Configuration mockConfiguration = Mockito.mock(Configuration.class);
+
Mockito.when(mockConfiguration.getConnectionRouters()).thenReturn(configurations);
+
+ CacheConfiguration cacheConfig1 = new CacheConfiguration();
+ cacheConfig1.setPersisted(false);
+ cacheConfig1.setTimeout(1000);
+ ConnectionRouterConfiguration connectionRouterConfiguration1 = new
ConnectionRouterConfiguration();
+ connectionRouterConfiguration1.setName("partition-local-pool");
+ connectionRouterConfiguration1.setCacheConfiguration(cacheConfig1);
+
+ configurations.add(connectionRouterConfiguration1);
+
+ underTest.deploy(mockConfiguration);
+
+ Mockito.verify(mockManagementService,
Mockito.times(1)).registerConnectionRouter(Mockito.any());
+ Mockito.verifyNoMoreInteractions(mockManagementService);
+ Mockito.clearInvocations(mockManagementService);
+
+ CacheConfiguration cacheConfig2 = new CacheConfiguration();
+ cacheConfig2.setPersisted(false);
+ cacheConfig2.setTimeout(1000);
+ ConnectionRouterConfiguration connectionRouterConfiguration2 = new
ConnectionRouterConfiguration();
+ connectionRouterConfiguration2.setName("partition-local-pool");
+ connectionRouterConfiguration2.setCacheConfiguration(cacheConfig2);
+
+ configurations.clear();
+ configurations.add(connectionRouterConfiguration2);
+
+ underTest.update(mockConfiguration); // Different object but no change
in configuration
+
+ Mockito.verifyNoInteractions(mockManagementService);
+
+ CacheConfiguration cacheConfig3 = new CacheConfiguration();
+ cacheConfig3.setPersisted(false);
+ cacheConfig3.setTimeout(2000);
+ ConnectionRouterConfiguration connectionRouterConfiguration3 = new
ConnectionRouterConfiguration();
+ connectionRouterConfiguration3.setName("partition-local-pool");
+ connectionRouterConfiguration3.setCacheConfiguration(cacheConfig3);
+
+ configurations.clear();
+ configurations.add(connectionRouterConfiguration3);
+
+ underTest.update(mockConfiguration); // Small change requires remove and
replace
+
+ Mockito.verify(mockManagementService,
Mockito.times(1)).unregisterConnectionRouter(connectionRouterConfiguration3.getName());
+ Mockito.verify(mockManagementService,
Mockito.times(1)).registerConnectionRouter(Mockito.any());
+ Mockito.verifyNoMoreInteractions(mockManagementService);
+ Mockito.clearInvocations(mockManagementService);
+ }
+
+ @Test
+ public void deployWithPoolConfigurationAndUpdateIt() throws Exception {
+ ManagementService mockManagementService =
Mockito.mock(ManagementService.class);
+
Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService);
+
+ List<ConnectionRouterConfiguration> configurations = new ArrayList<>();
+
+ Configuration mockConfiguration = Mockito.mock(Configuration.class);
+
Mockito.when(mockConfiguration.getConnectionRouters()).thenReturn(configurations);
+
Mockito.when(mockServer.getConfiguration()).thenReturn(mockConfiguration);
+
+ ConnectionRouterConfiguration connectionRouterConfiguration = new
ConnectionRouterConfiguration();
+ connectionRouterConfiguration.setName("test-local-pool");
+ NamedPropertyConfiguration policyConfig = new
NamedPropertyConfiguration();
+ policyConfig.setName(ConsistentHashPolicy.NAME);
+ connectionRouterConfiguration.setPolicyConfiguration(policyConfig);
+
+ PoolConfiguration poolConfiguration = new PoolConfiguration();
+ poolConfiguration.setLocalTargetEnabled(true);
+ poolConfiguration.setStaticConnectors(List.of("connector1"));
+ poolConfiguration.setCheckPeriod(10);
+ poolConfiguration.setQuorumSize(1);
+ connectionRouterConfiguration.setPoolConfiguration(poolConfiguration);
+
+ configurations.add(connectionRouterConfiguration);
+
+ underTest.deploy(mockConfiguration);
+
+ Mockito.verify(mockManagementService,
Mockito.times(1)).registerConnectionRouter(Mockito.any());
+ Mockito.verifyNoMoreInteractions(mockManagementService);
+ Mockito.clearInvocations(mockManagementService);
+
+ final ConnectionRouter router1 = underTest.getRouter("test-local-pool");
+ final Pool pool1 = router1.getPool();
+
+ assertEquals(10, pool1.getCheckPeriod());
+ assertEquals(1, pool1.getQuorumSize());
+
+ ConnectionRouterConfiguration connectionRouterConfigurationUpdate = new
ConnectionRouterConfiguration();
+ connectionRouterConfigurationUpdate.setName("test-local-pool");
+ NamedPropertyConfiguration policyConfigUpdate = new
NamedPropertyConfiguration();
+ policyConfigUpdate.setName(ConsistentHashPolicy.NAME);
+
connectionRouterConfigurationUpdate.setPolicyConfiguration(policyConfigUpdate);
+
+ PoolConfiguration poolConfigurationUpdate = new PoolConfiguration();
+ poolConfigurationUpdate.setLocalTargetEnabled(true);
+ poolConfigurationUpdate.setStaticConnectors(List.of("connector1"));
+ poolConfigurationUpdate.setCheckPeriod(100);
+ poolConfigurationUpdate.setQuorumSize(20);
+
connectionRouterConfigurationUpdate.setPoolConfiguration(poolConfigurationUpdate);
+
+ configurations.clear();
+ configurations.add(connectionRouterConfigurationUpdate);
+
+ underTest.update(mockConfiguration);
+
+ Mockito.verify(mockManagementService,
Mockito.times(1)).registerConnectionRouter(Mockito.any());
+ Mockito.verify(mockManagementService,
Mockito.times(1)).unregisterConnectionRouter(connectionRouterConfiguration.getName());
+ Mockito.verifyNoMoreInteractions(mockManagementService);
+ Mockito.clearInvocations(mockManagementService);
+
+ final ConnectionRouter router2 = underTest.getRouter("test-local-pool");
+ final Pool pool2 = router2.getPool();
+
+ assertNotSame(pool1, pool2);
+ assertEquals(100, pool2.getCheckPeriod());
+ assertEquals(20, pool2.getQuorumSize());
+
+ underTest.stop();
}
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java
index 4303c655e3..f1f642e6bf 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/routing/ConnectionRouterTest.java
@@ -19,6 +19,9 @@ package org.apache.activemq.artemis.core.server.routing;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.routing.policies.AbstractPolicy;
import org.apache.activemq.artemis.core.server.routing.policies.Policy;
@@ -28,6 +31,7 @@ import
org.apache.activemq.artemis.core.server.routing.targets.TargetResult;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
@@ -39,18 +43,42 @@ public class ConnectionRouterTest {
@BeforeEach
public void setUp() {
ActiveMQServer mockServer = mock(ActiveMQServer.class);
+
+
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.of(UUID.randomUUID().toString()));
+
localTarget = new LocalTarget(null, mockServer);
}
@Test
- public void getTarget() {
+ public void getTarget() throws Exception {
Policy policy = null;
underTest = new ConnectionRouter("test", KeyType.CLIENT_ID, "^.{3}",
- localTarget, "^FOO.*", null, null,
policy);
+ localTarget, "^FOO.*", null, null,
policy);
+
+ underTest.start();
+
assertEquals(localTarget, underTest.getTarget("FOO_EE").getTarget());
assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT,
underTest.getTarget("BAR_EE"));
}
+ @Test
+ public void getTargetWhenNotStarted() throws Exception {
+ Policy policy = null;
+ underTest = new ConnectionRouter("test", KeyType.CLIENT_ID, "^.{3}",
+ localTarget, "^FOO.*", null, null,
policy);
+
+ assertEquals(TargetResult.REFUSED_UNAVAILABLE_RESULT,
underTest.getTarget("BAR_EE"));
+
+ underTest.start();
+
+ assertEquals(localTarget, underTest.getTarget("FOO_EE").getTarget());
+ assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT,
underTest.getTarget("BAR_EE"));
+
+ underTest.stop();
+
+ assertEquals(TargetResult.REFUSED_UNAVAILABLE_RESULT,
underTest.getTarget("BAR_EE"));
+ }
+
@Test
public void getLocalTargetWithTransformer() throws Exception {
Policy policy = new AbstractPolicy("TEST") {
@@ -61,8 +89,9 @@ public class ConnectionRouterTest {
};
underTest = new ConnectionRouter("test", KeyType.CLIENT_ID, "^.{3}",
- localTarget, "^FOO.*", null, null,
policy);
+ localTarget, "^FOO.*", null, null,
policy);
+ underTest.start();
+
assertEquals(localTarget,
underTest.getTarget("TRANSFORM_TO_FOO_EE").getTarget());
}
-
}
diff --git a/docs/user-manual/config-reload.adoc
b/docs/user-manual/config-reload.adoc
index b8c6e828e0..87487406af 100644
--- a/docs/user-manual/config-reload.adoc
+++ b/docs/user-manual/config-reload.adoc
@@ -539,6 +539,9 @@ Below lists the effects of adding, deleting and updating of
an element/attribute
==== `<acceptors>`
Adding, updating and removing an `<acceptor>` is supported, updating or
removing an `<acceptor>` results in the closure of all connections that were
accepted previously. Added or updated acceptors are automatically started
during the configuration reload process unless the `auto-start` option is set
to false.
+==== `<connection-routers>`
+Adding, updating and removing an `<connection-router>` is supported, updating
or removing an `<connection-router>` results in the stop of the existing
connection router and re-adding a new instance if updated.
+
=== `<jms>` _(Deprecated)_
=== `<queues>` _(Deprecated)_
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index 0d88ec12e4..bf1a6dd362 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -52,6 +52,7 @@ import
org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.AcceptorControl;
+import org.apache.activemq.artemis.api.core.management.ConnectionRouterControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.postoffice.Binding;
@@ -67,6 +68,7 @@ import
org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
+import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@@ -1589,7 +1591,7 @@ public class RedeployTest extends ActiveMQTestBase {
assertEquals("61616",
acceptor.getParams().get(TransportConstants.PORT_PROP_NAME));
assertNull(acceptor.getParams().get(TransportConstants.AUTO_START));
- final AcceptorControl acceptorControl = (AcceptorControl)
managementService.getAcceptorControl("artemis");
+ final AcceptorControl acceptorControl =
managementService.getAcceptorControl("artemis");
assertNotNull(acceptorControl);
assertTrue(acceptorControl.isStarted());
@@ -1865,6 +1867,305 @@ public class RedeployTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testRedeployNewConnectionRouterAndRemovePreviousRouter() throws
Exception {
+ Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ URL url1 =
RedeployTest.class.getClassLoader().getResource("reload-connection-router.xml");
+ URL url2 =
RedeployTest.class.getClassLoader().getResource("reload-connection-router-updated.xml");
+ Files.copy(url1.openStream(), brokerXML);
+
+ final String expectedRouter = "simple-local";
+ final String expectedRouterAfterUpdate = "simple-local-with-transformer";
+
+ final MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+ runAfter(() -> MBeanServerFactory.releaseMBeanServer(mBeanServer));
+
+ EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.setMbeanServer(mBeanServer);
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+ final Runnable tick = latch::countDown;
+ final ManagementService managementService =
embeddedActiveMQ.getActiveMQServer().getManagementService();
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+
+ final ConnectionRouterControl routerControl =
managementService.getConnectionRouterControl(expectedRouter);
+
+ assertNotNull(routerControl);
+
assertNull(managementService.getConnectionRouterControl(expectedRouterAfterUpdate));
+
+ Files.copy(url2.openStream(), brokerXML,
StandardCopyOption.REPLACE_EXISTING);
+ brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+ final ConnectionRouterControl routerControlUpdated =
managementService.getConnectionRouterControl(expectedRouterAfterUpdate);
+
+ assertNotNull(routerControlUpdated);
+
assertNull(managementService.getConnectionRouterControl(expectedRouter));
+
+ final ConnectionRouter router =
embeddedActiveMQ.getActiveMQServer().getConnectionRouterManager().getRouter(expectedRouterAfterUpdate);
+
+ assertNotNull(router);
+ assertEquals("CLIENT_ID", router.getKey().toString());
+ assertEquals("CONSISTENT_HASH_MODULO", router.getPolicy().getName());
+ assertTrue(router.isStarted());
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ @Test
+ public void testDeployNewConnectionRouterFromBrokerProperties() throws
Exception {
+ Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ URL url1 =
RedeployTest.class.getClassLoader().getResource("reload-acceptor.xml");
+ Path brokerProperties =
getTestDirfile().toPath().resolve("broker.properties");
+ Files.copy(url1.openStream(), brokerXML);
+
+ final MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+ runAfter(() -> MBeanServerFactory.releaseMBeanServer(mBeanServer));
+
+ final String expectedRouter = "simple-local";
+ final Properties properties = new
ConfigurationImpl.InsertionOrderedProperties();
+
+ Writer propertiesWriter = Files.newBufferedWriter(brokerProperties,
StandardOpenOption.WRITE,
+
StandardOpenOption.CREATE,
+
StandardOpenOption.TRUNCATE_EXISTING);
+ try {
+ properties.store(propertiesWriter, null);
+ } finally {
+ propertiesWriter.flush();
+ propertiesWriter.close();
+ }
+
+ EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setPropertiesResourcePath(brokerProperties.toString());
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.setMbeanServer(mBeanServer);
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+ final Runnable tick = latch::countDown;
+ final ManagementService managementService =
embeddedActiveMQ.getActiveMQServer().getManagementService();
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+
+ assertTrue(managementService.getConnectionRouterControls().isEmpty());
+
+ // Update the broker properties file with a new connection router, it
should appear in the
+ // management service after the reload of properties.
+ properties.clear();
+ properties.put("connectionRouters.simple-local.keyFilter", "^[^.]+");
+ properties.put("connectionRouters.simple-local.keyType", "CLIENT_ID");
+ properties.put("connectionRouters.simple-local.localTargetFilter",
"DEFAULT");
+ properties.put("connectionRouters.simple-local.name=simple-local",
"local");
+
+ propertiesWriter = Files.newBufferedWriter(brokerProperties,
StandardOpenOption.WRITE,
+
StandardOpenOption.TRUNCATE_EXISTING);
+ try {
+ properties.store(propertiesWriter, null);
+ } finally {
+ propertiesWriter.flush();
+ propertiesWriter.close();
+ }
+
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+
assertFalse(managementService.getConnectionRouterControls().isEmpty());
+
assertNotNull(managementService.getConnectionRouterControl(expectedRouter));
+
+ final ConnectionRouterControl control =
managementService.getConnectionRouterControl(expectedRouter);
+
+ assertEquals("DEFAULT", control.getLocalTargetFilter());
+ assertEquals("^[^.]+", control.getTargetKeyFilter());
+
+ final ConnectionRouter router =
embeddedActiveMQ.getActiveMQServer().getConnectionRouterManager().getRouter(expectedRouter);
+
+ assertEquals("CLIENT_ID", router.getKey().toString());
+ assertTrue(router.isStarted());
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ @Test
+ public void testUpdateExistingConnectionRouterFromBrokerProperties() throws
Exception {
+ Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ URL url1 =
RedeployTest.class.getClassLoader().getResource("reload-acceptor.xml");
+ Path brokerProperties =
getTestDirfile().toPath().resolve("broker.properties");
+ Files.copy(url1.openStream(), brokerXML);
+
+ final MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+ runAfter(() -> MBeanServerFactory.releaseMBeanServer(mBeanServer));
+
+ final String expectedRouter = "simple-local";
+ final Properties properties = new
ConfigurationImpl.InsertionOrderedProperties();
+
+ Writer propertiesWriter = Files.newBufferedWriter(brokerProperties,
StandardOpenOption.WRITE,
+
StandardOpenOption.CREATE,
+
StandardOpenOption.TRUNCATE_EXISTING);
+
+ properties.put("connectionRouters.simple-local.keyFilter", "^[^.]+");
+ properties.put("connectionRouters.simple-local.keyType", "CLIENT_ID");
+ properties.put("connectionRouters.simple-local.localTargetFilter",
"DEFAULT");
+ properties.put("connectionRouters.simple-local.name=simple-local",
"local");
+
+ try {
+ properties.store(propertiesWriter, null);
+ } finally {
+ propertiesWriter.flush();
+ propertiesWriter.close();
+ }
+
+ EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setPropertiesResourcePath(brokerProperties.toString());
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.setMbeanServer(mBeanServer);
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+ final Runnable tick = latch::countDown;
+ final ManagementService managementService =
embeddedActiveMQ.getActiveMQServer().getManagementService();
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+
+
assertFalse(managementService.getConnectionRouterControls().isEmpty());
+
assertNotNull(managementService.getConnectionRouterControl(expectedRouter));
+
+ final ConnectionRouterControl control =
managementService.getConnectionRouterControl(expectedRouter);
+
+ assertEquals("DEFAULT", control.getLocalTargetFilter());
+ assertEquals("^[^.]+", control.getTargetKeyFilter());
+
+ final ConnectionRouter router =
embeddedActiveMQ.getActiveMQServer().getConnectionRouterManager().getRouter(expectedRouter);
+
+ assertEquals("CLIENT_ID", router.getKey().toString());
+ assertTrue(router.isStarted());
+
+ // Update the broker properties file with a new connection router, it
should appear in the
+ // management service after the reload of properties.
+ properties.clear();
+ properties.put("connectionRouters.simple-local.keyFilter", "^[^.]+");
+ properties.put("connectionRouters.simple-local.keyType", "SNI_HOST");
+ properties.put("connectionRouters.simple-local.localTargetFilter",
"DEFAULT");
+ properties.put("connectionRouters.simple-local.name=simple-local",
"local");
+
+ propertiesWriter = Files.newBufferedWriter(brokerProperties,
StandardOpenOption.WRITE,
+
StandardOpenOption.TRUNCATE_EXISTING);
+ try {
+ properties.store(propertiesWriter, null);
+ } finally {
+ propertiesWriter.flush();
+ propertiesWriter.close();
+ }
+
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+
assertFalse(managementService.getConnectionRouterControls().isEmpty());
+
assertNotNull(managementService.getConnectionRouterControl(expectedRouter));
+
+ final ConnectionRouterControl updatedControl =
managementService.getConnectionRouterControl(expectedRouter);
+
+ assertEquals("DEFAULT", updatedControl.getLocalTargetFilter());
+ assertEquals("^[^.]+", updatedControl.getTargetKeyFilter());
+
+ final ConnectionRouter updatedRouter =
embeddedActiveMQ.getActiveMQServer().getConnectionRouterManager().getRouter(expectedRouter);
+
+ assertEquals("SNI_HOST", updatedRouter.getKey().toString());
+ assertTrue(updatedRouter.isStarted());
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ @Test
+ public void testRemoveConnectionRouterFromBrokerPropertiesUpdate() throws
Exception {
+ Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ URL url1 =
RedeployTest.class.getClassLoader().getResource("reload-acceptor.xml");
+ Path brokerProperties =
getTestDirfile().toPath().resolve("broker.properties");
+ Files.copy(url1.openStream(), brokerXML);
+
+ final MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
+ runAfter(() -> MBeanServerFactory.releaseMBeanServer(mBeanServer));
+
+ final String expectedRouter = "simple-local";
+ final Properties properties = new
ConfigurationImpl.InsertionOrderedProperties();
+
+ Writer propertiesWriter = Files.newBufferedWriter(brokerProperties,
StandardOpenOption.WRITE,
+
StandardOpenOption.CREATE,
+
StandardOpenOption.TRUNCATE_EXISTING);
+
+ properties.put("connectionRouters.simple-local.keyFilter", "^[^.]+");
+ properties.put("connectionRouters.simple-local.keyType", "CLIENT_ID");
+ properties.put("connectionRouters.simple-local.localTargetFilter",
"DEFAULT");
+ properties.put("connectionRouters.simple-local.name=simple-local",
"local");
+
+ try {
+ properties.store(propertiesWriter, null);
+ } finally {
+ propertiesWriter.flush();
+ propertiesWriter.close();
+ }
+
+ EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setPropertiesResourcePath(brokerProperties.toString());
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.setMbeanServer(mBeanServer);
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+ final Runnable tick = latch::countDown;
+ final ManagementService managementService =
embeddedActiveMQ.getActiveMQServer().getManagementService();
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+
+
assertFalse(managementService.getConnectionRouterControls().isEmpty());
+
assertNotNull(managementService.getConnectionRouterControl(expectedRouter));
+
+ // Update the broker properties file with and remove the connection
router.
+ properties.clear();
+
+ propertiesWriter = Files.newBufferedWriter(brokerProperties,
StandardOpenOption.WRITE,
+
StandardOpenOption.TRUNCATE_EXISTING);
+ try {
+ properties.store(propertiesWriter, null);
+ } finally {
+ propertiesWriter.flush();
+ propertiesWriter.close();
+ }
+
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+
assertNull(managementService.getConnectionRouterControl(expectedRouter));
+ assertTrue(managementService.getConnectionRouterControls().isEmpty());
+
+
assertNull(embeddedActiveMQ.getActiveMQServer().getConnectionRouterManager().getRouter(expectedRouter));
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
private TransportConfiguration findInConfiguration(String acceptorName,
Configuration configuration) {
final Set<TransportConfiguration> acceptors =
configuration.getAcceptorConfigurations();
diff --git
a/tests/integration-tests/src/test/resources/reload-connection-router-updated.xml
b/tests/integration-tests/src/test/resources/reload-connection-router-updated.xml
new file mode 100644
index 0000000000..c03a23d61b
--- /dev/null
+++
b/tests/integration-tests/src/test/resources/reload-connection-router-updated.xml
@@ -0,0 +1,47 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core">
+
+
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+ <persistence-enabled>false</persistence-enabled>
+ <security-enabled>false</security-enabled>
+
+ <connection-routers>
+ <connection-router name="simple-local-with-transformer">
+ <key-type>CLIENT_ID</key-type>
+ <key-filter>^[^.]+</key-filter>
+ <local-target-filter>DEFAULT</local-target-filter>
+ <policy name="CONSISTENT_HASH_MODULO">
+ <property key="MODULO" value="2"></property>
+ </policy>
+ </connection-router>
+ </connection-routers>
+
+ <acceptors>
+ <acceptor
name="artemis">tcp://127.0.0.1:61616?autoStart=false</acceptor>
+ </acceptors>
+ </core>
+</configuration>
diff --git
a/tests/integration-tests/src/test/resources/reload-connection-router.xml
b/tests/integration-tests/src/test/resources/reload-connection-router.xml
new file mode 100644
index 0000000000..f9a665133a
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-connection-router.xml
@@ -0,0 +1,44 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core">
+
+
<configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+ <persistence-enabled>false</persistence-enabled>
+ <security-enabled>false</security-enabled>
+
+ <connection-routers>
+ <connection-router name="simple-local">
+ <key-type>CLIENT_ID</key-type>
+ <key-filter>^[^.]+</key-filter>
+ <local-target-filter>DEFAULT</local-target-filter>
+ </connection-router>
+ </connection-routers>
+
+ <acceptors>
+ <acceptor name="artemis">tcp://127.0.0.1:61616</acceptor>
+ </acceptors>
+ </core>
+</configuration>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]