Repository: activemq-artemis Updated Branches: refs/heads/2.6.x 0cd3fac1e -> 20ff68df8
ARTEMIS-2160: Addressed occurance where cluster configuration on server locator was hard coded. Covered with test. (cherry picked from commit 5f74faa34a805ebf3222f7b05d2735fbbeae50bd) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/20ff68df Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/20ff68df Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/20ff68df Branch: refs/heads/2.6.x Commit: 20ff68df80faa9bbd850ad6c0ebca6078751a107 Parents: 0cd3fac Author: Roddie Kieley <[email protected]> Authored: Wed Oct 31 15:04:07 2018 -0230 Committer: Justin Bertram <[email protected]> Committed: Fri Nov 2 11:17:24 2018 -0500 ---------------------------------------------------------------------- .../core/server/cluster/ClusterController.java | 11 ++- .../cluster/ClusterControllerTest.java | 85 +++++++++++++++++++- .../cluster/distribution/ClusterTestBase.java | 74 ++++++++++++----- 3 files changed, 146 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20ff68df/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java index 03eb243..15cf04e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterController.java @@ -185,8 +185,11 @@ public class ClusterController implements ActiveMQComponent { serverLocator.setConnectionTTL(config.getConnectionTTL()); serverLocator.setClientFailureCheckPeriod(config.getClientFailureCheckPeriod()); //if the cluster isn't available we want to hang around until it is - serverLocator.setReconnectAttempts(-1); - serverLocator.setInitialConnectAttempts(-1); + serverLocator.setReconnectAttempts(config.getReconnectAttempts()); + serverLocator.setInitialConnectAttempts(config.getInitialConnectAttempts()); + serverLocator.setRetryInterval(config.getRetryInterval()); + serverLocator.setRetryIntervalMultiplier(config.getRetryIntervalMultiplier()); + serverLocator.setMaxRetryInterval(config.getMaxRetryInterval()); //this is used for replication so need to use the server packet decoder serverLocator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(serverLocator)); serverLocator.setThreadPools(server.getThreadPool(), server.getScheduledPool()); @@ -438,4 +441,8 @@ public class ClusterController implements ActiveMQComponent { return this.replicationLocator; } + public ServerLocator getServerLocator(SimpleString name) { + return locators.get(name); + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20ff68df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java index 91857ef..f7cbd62 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java @@ -16,12 +16,19 @@ */ package org.apache.activemq.artemis.tests.integration.cluster; +import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory; import org.apache.activemq.artemis.core.server.cluster.ClusterControl; import org.apache.activemq.artemis.core.server.cluster.ClusterController; +import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.junit.Before; @@ -29,6 +36,9 @@ import org.junit.Test; public class ClusterControllerTest extends ClusterTestBase { + private ClusterConnectionConfiguration clusterConf0; + private ClusterConnectionConfiguration clusterConf1; + @Override @Before public void setUp() throws Exception { @@ -45,13 +55,74 @@ public class ClusterControllerTest extends ClusterTestBase { getServer(1).getConfiguration().setClusterPassword("something different"); - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 0); - setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 1); + clusterConf0 = new ClusterConnectionConfiguration() + .setName("cluster0") + .setAddress("queues") + .setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND) + .setMaxHops(1) + .setInitialConnectAttempts(8) + .setReconnectAttempts(10) + .setRetryInterval(250) + .setMaxRetryInterval(4000) + .setRetryIntervalMultiplier(2.0); + + clusterConf1 = new ClusterConnectionConfiguration() + .setName("cluster0") + .setAddress("queues") + .setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND) + .setMaxHops(1) + .setInitialConnectAttempts(8) + .setReconnectAttempts(10) + .setRetryInterval(250) + .setMaxRetryInterval(4000) + .setRetryIntervalMultiplier(2.0); + + setupClusterConnection(clusterConf0, true, 0); + setupClusterConnection(clusterConf1, true, 1); startServers(0); startServers(1); } + private boolean clusterConnectionConfigurationIsSameBeforeAfterStart(ClusterConnectionConfiguration clusterConnectionConfigurationBeforeStart, int node) { + boolean clusterConnectionConfigurationIsSame = false; + + Configuration serverNodeConfiguration = getServer(node).getConfiguration(); + ActiveMQServer serverNode = getServer(node); + ClusterManager clusterManager = serverNode.getClusterManager(); + ClusterController clusterController = clusterManager.getClusterController(); + ServerLocator serverNodeLocator = clusterController.getServerLocator(new SimpleString(clusterConnectionConfigurationBeforeStart.getName())); + List<ClusterConnectionConfiguration> serverNodeClusterConnectionConfigurations = serverNodeConfiguration.getClusterConfigurations(); + + do { + if (serverNodeLocator.getInitialConnectAttempts() != clusterConnectionConfigurationBeforeStart.getInitialConnectAttempts()) { + break; + } + + if (serverNodeLocator.getReconnectAttempts() != clusterConnectionConfigurationBeforeStart.getReconnectAttempts()) { + break; + } + + if (serverNodeLocator.getRetryInterval() != clusterConnectionConfigurationBeforeStart.getRetryInterval()) { + break; + } + if (serverNodeLocator.getMaxRetryInterval() != clusterConnectionConfigurationBeforeStart.getMaxRetryInterval()) { + break; + } + + Double serverNodeClusterConnectionConfigurationRIM = serverNodeLocator.getRetryIntervalMultiplier(); + Double clusterConnectionConfigurationBeforeStartRIM = clusterConnectionConfigurationBeforeStart.getRetryIntervalMultiplier(); + if (0 != serverNodeClusterConnectionConfigurationRIM.compareTo(clusterConnectionConfigurationBeforeStartRIM)) { + break; + } + + clusterConnectionConfigurationIsSame = true; + } + while (false); + + return clusterConnectionConfigurationIsSame; + } + @Test public void controlWithDifferentConnector() throws Exception { try (ServerLocatorImpl locator = (ServerLocatorImpl) createInVMNonHALocator()) { @@ -76,4 +147,14 @@ public class ClusterControllerTest extends ClusterTestBase { } } } + + @Test + public void verifyServerLocatorsClusterConfiguration() { + if (false == clusterConnectionConfigurationIsSameBeforeAfterStart(clusterConf0, 0)) { + fail("serverLocator is not configured as per clusterConf0"); + } + if (false == clusterConnectionConfigurationIsSameBeforeAfterStart(clusterConf1, 1)) { + fail("serverLocator is not configured as per clusterConf1"); + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20ff68df/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index 89d5175..2ba0a7b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -1737,6 +1737,36 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { setupClusterConnection(name, address, messageLoadBalancingType, maxHops, netty, null, nodeFrom, nodesTo); } + private List<String> getClusterConnectionTCNames(boolean netty, ActiveMQServer serverFrom, int[] nodesTo) { + List<String> pairs = new ArrayList<>(); + for (int element : nodesTo) { + TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty)); + serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc); + pairs.add(serverTotc.getName()); + } + return pairs; + } + + protected void setupClusterConnection(ClusterConnectionConfiguration clusterConf, + final boolean netty, + final int nodeFrom, + final int... nodesTo) { + ActiveMQServer serverFrom = servers[nodeFrom]; + + if (serverFrom == null) { + throw new IllegalStateException("No server at node " + nodeFrom); + } + + TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty)); + serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom); + + List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo); + Configuration config = serverFrom.getConfiguration(); + clusterConf.setConnectorName(connectorFrom.getName()).setConfirmationWindowSize(1024).setStaticConnectors(pairs); + + config.getClusterConfigurations().add(clusterConf); + } + protected void setupClusterConnection(final String name, final String address, final MessageLoadBalancingType messageLoadBalancingType, @@ -1754,12 +1784,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty)); serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom); - List<String> pairs = new ArrayList<>(); - for (int element : nodesTo) { - TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty)); - serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc); - pairs.add(serverTotc.getName()); - } + List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo); Configuration config = serverFrom.getConfiguration(); ClusterConnectionConfiguration clusterConf = createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs); @@ -1787,15 +1812,21 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty)); serverFrom.getConfiguration().getConnectorConfigurations().put(connectorFrom.getName(), connectorFrom); - List<String> pairs = new ArrayList<>(); - for (int element : nodesTo) { - TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty)); - serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc); - pairs.add(serverTotc.getName()); - } + List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo); Configuration config = serverFrom.getConfiguration(); - ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(retryInterval).setReconnectAttempts(reconnectAttempts).setCallTimeout(100).setCallFailoverTimeout(100).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs); + ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration() + .setName(name) + .setAddress(address) + .setConnectorName(connectorFrom.getName()) + .setRetryInterval(retryInterval) + .setReconnectAttempts(reconnectAttempts) + .setCallTimeout(100) + .setCallFailoverTimeout(100) + .setMessageLoadBalancingType(messageLoadBalancingType) + .setMaxHops(maxHops) + .setConfirmationWindowSize(1024) + .setStaticConnectors(pairs); config.getClusterConfigurations().add(clusterConf); } @@ -1806,7 +1837,15 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { final int maxHops, TransportConfiguration connectorFrom, List<String> pairs) { - return new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorFrom.getName()).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs); + return new ClusterConnectionConfiguration() + .setName(name) + .setAddress(address) + .setConnectorName(connectorFrom.getName()) + .setRetryInterval(250) + .setMessageLoadBalancingType(messageLoadBalancingType) + .setMaxHops(maxHops) + .setConfirmationWindowSize(1024) + .setStaticConnectors(pairs); } protected void setupClusterConnectionWithBackups(final String name, @@ -1825,12 +1864,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { TransportConfiguration connectorFrom = createTransportConfiguration(netty, false, generateParams(nodeFrom, netty)); serverFrom.getConfiguration().getConnectorConfigurations().put(name, connectorFrom); - List<String> pairs = new ArrayList<>(); - for (int element : nodesTo) { - TransportConfiguration serverTotc = createTransportConfiguration(netty, false, generateParams(element, netty)); - serverFrom.getConfiguration().getConnectorConfigurations().put(serverTotc.getName(), serverTotc); - pairs.add(serverTotc.getName()); - } + List<String> pairs = getClusterConnectionTCNames(netty, serverFrom, nodesTo); Configuration config = serverFrom.getConfiguration(); ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(name).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
