This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new bb08a573eb ARTEMIS-3640 Use client connectors for HA
bb08a573eb is described below
commit bb08a573ebdd414abcebad9223a599360443facf
Author: Domenico Francesco Bruscino <[email protected]>
AuthorDate: Mon Mar 6 16:44:19 2023 +0100
ARTEMIS-3640 Use client connectors for HA
---
.../artemis/api/core/TransportConfiguration.java | 2 +
.../core/client/impl/ClientSessionFactoryImpl.java | 36 +++-
.../core/client/impl/ServerLocatorImpl.java | 2 +-
.../connector/TCPTransportConfigurationSchema.java | 12 +-
...nnectorTransportConfigurationParserURITest.java | 22 +++
.../artemis/tests/util/ActiveMQTestBase.java | 9 +-
docs/user-manual/en/ha.md | 41 +++++
.../cluster/distribution/ClusterTestBase.java | 7 +-
.../failover/ClientConnectorFailoverTest.java | 192 +++++++++++++++++++++
.../StaticClusterWithBackupFailoverTest.java | 12 ++
10 files changed, 325 insertions(+), 10 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
index 5a823a8369..048ea51ffc 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java
@@ -47,6 +47,8 @@ public class TransportConfiguration implements Serializable {
private static final long serialVersionUID = -3994528421527392679L;
+ public static final String NAME_PARAM = "name";
+
public static final String EXTRA_PROPERTY_PREFIX = "$.EP.";
private String name;
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index ac6b70c7f0..1f21d3351b 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -82,7 +82,7 @@ public class ClientSessionFactoryImpl implements
ClientSessionFactoryInternal, C
private final ClientProtocolManager clientProtocolManager;
- private final TransportConfiguration connectorConfig;
+ private TransportConfiguration connectorConfig;
private TransportConfiguration previousConnectorConfig;
@@ -158,6 +158,8 @@ public class ClientSessionFactoryImpl implements
ClientSessionFactoryInternal, C
private final Object connectionReadyLock = new Object();
+ private final TransportConfiguration[] connectorConfigs;
+
public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final TransportConfiguration
connectorConfig,
final ServerLocatorConfig locatorConfig,
@@ -171,6 +173,19 @@ public class ClientSessionFactoryImpl implements
ClientSessionFactoryInternal, C
scheduledThreadPool, incomingInterceptors,
outgoingInterceptors);
}
+ ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
+ final Pair<TransportConfiguration, TransportConfiguration>
connectorConfig,
+ final ServerLocatorConfig locatorConfig,
+ final int reconnectAttempts,
+ final Executor threadPool,
+ final ScheduledExecutorService scheduledThreadPool,
+ final List<Interceptor> incomingInterceptors,
+ final List<Interceptor> outgoingInterceptors) {
+ this(serverLocator, connectorConfig,
+ locatorConfig, reconnectAttempts, threadPool,
+ scheduledThreadPool, incomingInterceptors, outgoingInterceptors,
null);
+ }
+
ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
final Pair<TransportConfiguration,
TransportConfiguration> connectorConfig,
final ServerLocatorConfig locatorConfig,
@@ -178,7 +193,8 @@ public class ClientSessionFactoryImpl implements
ClientSessionFactoryInternal, C
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> incomingInterceptors,
- final List<Interceptor> outgoingInterceptors) {
+ final List<Interceptor> outgoingInterceptors,
+ final TransportConfiguration[] connectorConfigs) {
createTrace = new Exception();
this.serverLocator = serverLocator;
@@ -238,6 +254,8 @@ public class ClientSessionFactoryImpl implements
ClientSessionFactoryInternal, C
if (connectorConfig.getB() != null) {
this.backupConnectorConfig = connectorConfig.getB();
}
+
+ this.connectorConfigs = connectorConfigs;
}
@Override
@@ -1150,6 +1168,20 @@ public class ClientSessionFactoryImpl implements
ClientSessionFactoryInternal, C
if (backupConnectorConfig != null) {
+ //Try to connect with the client connector that match the backup
connector name
+ String backupConnectorName = backupConnectorConfig.getName();
+ if (backupConnectorName != null && connectorConfigs != null) {
+ for (TransportConfiguration connectorConfig : connectorConfigs)
{
+ if (backupConnectorName.equals(connectorConfig.getName())) {
+ //Try to connect with the backup connector configuration
+ transportConnection = createTransportConnection("backup",
connectorConfig);
+ if (transportConnection != null) {
+ return transportConnection;
+ }
+ }
+ }
+ }
+
//Try to connect with the backup connector configuration
transportConnection = createTransportConnection("backup",
backupConnectorConfig);
if (transportConnection != null) {
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 127a3ea0f4..fa6b6b0dfb 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -678,7 +678,7 @@ public final class ServerLocatorImpl implements
ServerLocatorInternal, Discovery
// try each factory in the list until we find one which works
try {
- factory = new ClientSessionFactoryImpl(this, tc, config,
config.reconnectAttempts, threadPool, scheduledThreadPool,
incomingInterceptors, outgoingInterceptors);
+ factory = new ClientSessionFactoryImpl(this, tc, config,
config.reconnectAttempts, threadPool, scheduledThreadPool,
incomingInterceptors, outgoingInterceptors, initialConnectors);
try {
addToConnecting(factory);
// We always try to connect here with only one attempt,
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java
index e962a5d211..99c8da1391 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/uri/schema/connector/TCPTransportConfigurationSchema.java
@@ -65,7 +65,11 @@ public class TCPTransportConfigurationSchema extends
AbstractTransportConfigurat
BeanSupport.setData(uri, props, allowableProperties, query, extraProps);
List<TransportConfiguration> transportConfigurations = new ArrayList<>();
- TransportConfiguration config = new TransportConfiguration(factoryName,
props, name, extraProps);
+ String nameFromQuery =
query.getOrDefault(TransportConfiguration.NAME_PARAM, name);
+ if (name != null && !name.equals(nameFromQuery)) {
+ throw new IllegalArgumentException("Name doesn't match query param");
+ }
+ TransportConfiguration config = new TransportConfiguration(factoryName,
props, nameFromQuery, extraProps);
transportConfigurations.add(config);
String connectors = uri.getFragment();
@@ -77,8 +81,10 @@ public class TCPTransportConfigurationSchema extends
AbstractTransportConfigurat
HashMap<String, Object> newProps = new HashMap<>();
extraProps = new HashMap<>();
BeanSupport.setData(extraUri, newProps, allowableProperties,
query, extraProps);
- BeanSupport.setData(extraUri, newProps, allowableProperties,
parseQuery(extraUri.getQuery(), null), extraProps);
- transportConfigurations.add(new
TransportConfiguration(factoryName, newProps, name + ":" + extraUri.toString(),
extraProps));
+ Map<String, String> extraUriQuery =
parseQuery(extraUri.getQuery(), null);
+ BeanSupport.setData(extraUri, newProps, allowableProperties,
extraUriQuery, extraProps);
+ String extraUriNameFromQuery =
extraUriQuery.getOrDefault(TransportConfiguration.NAME_PARAM, name + ":" +
extraUri);
+ transportConfigurations.add(new
TransportConfiguration(factoryName, newProps, extraUriNameFromQuery,
extraProps));
}
}
return transportConfigurations;
diff --git
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java
index 8d82fbb577..6c8f5bc859 100644
---
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java
+++
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/tests/uri/ConnectorTransportConfigurationParserURITest.java
@@ -51,4 +51,26 @@ public class ConnectorTransportConfigurationParserURITest {
Assert.assertEquals("3", objects.get(2).getParams().get("port"));
}
+ @Test
+ public void testParseMultipleConnectorWithName() throws Exception {
+ ConnectorTransportConfigurationParser parser = new
ConnectorTransportConfigurationParser(false);
+
+ URI transportURI =
parser.expandURI("(tcp://live:1?name=live1,tcp://backupA:2?name=backupA2,tcp://backupB:3?name=backupB3");
+ System.out.println(transportURI);
+ List<TransportConfiguration> objects = parser.newObject(transportURI,
null);
+ if (logger.isInfoEnabled()) {
+ objects.forEach(t -> logger.info("transportConfig: {}", t));
+ }
+
+ Assert.assertEquals(3, objects.size());
+ Assert.assertEquals("live1", objects.get(0).getName());
+ Assert.assertEquals("live", objects.get(0).getParams().get("host"));
+ Assert.assertEquals("1", objects.get(0).getParams().get("port"));
+ Assert.assertEquals("backupA2", objects.get(1).getName());
+ Assert.assertEquals("backupA", objects.get(1).getParams().get("host"));
+ Assert.assertEquals("2", objects.get(1).getParams().get("port"));
+ Assert.assertEquals("backupB3", objects.get(2).getName());
+ Assert.assertEquals("backupB", objects.get(2).getParams().get("host"));
+ Assert.assertEquals("3", objects.get(2).getParams().get("port"));
+ }
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 95015ba719..4454d61b71 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1338,6 +1338,13 @@ public abstract class ActiveMQTestBase extends Assert {
protected static final TransportConfiguration
createTransportConfiguration(boolean netty,
boolean acceptor,
Map<String, Object> params) {
+ return
createTransportConfiguration(UUIDGenerator.getInstance().generateStringUUID(),
netty, acceptor, params);
+ }
+
+ protected static final TransportConfiguration
createTransportConfiguration(String name,
+
boolean netty,
+
boolean acceptor,
+
Map<String, Object> params) {
String className;
if (netty) {
if (acceptor) {
@@ -1354,7 +1361,7 @@ public abstract class ActiveMQTestBase extends Assert {
}
if (params == null)
params = new HashMap<>();
- return new TransportConfiguration(className, params,
UUIDGenerator.getInstance().generateStringUUID(), new HashMap<String,
Object>());
+ return new TransportConfiguration(className, params, name, new
HashMap<String, Object>());
}
protected void waitForServerToStart(ActiveMQServer server) throws
InterruptedException {
diff --git a/docs/user-manual/en/ha.md b/docs/user-manual/en/ha.md
index 694875c9f5..03f26fa30d 100644
--- a/docs/user-manual/en/ha.md
+++ b/docs/user-manual/en/ha.md
@@ -1150,6 +1150,47 @@ If you wish to provide *once and only once* delivery
guarantees for non
transacted sessions too, enable duplicate detection, and catch unblock
exceptions as described in [Handling Blocking Calls During Failover](ha.md)
+#### Use client connectors to fail over
+
+Apache ActiveMQ Artemis clients retrieve the backup connector from the
+topology updates that the cluster brokers send. If the connection options
+of the clients don't match the options of the cluster brokers the clients
+can define a client connector that will be used in place of the connector
+in the topology. To define a client connector it must have a name that matches
+the name of the connector defined in the cluster connection of the broker, i.e.
+supposing to have a live broker with the cluster connector name `node-0`
+and a backup broker with the cluster connector name `node-1` the client
+connection url must define 2 connectors with the names `node-0` and `node-1`:
+
+Live broker config
+```xml
+<connectors>
+ <!-- Connector used to be announced through cluster connections and
notifications -->
+ <connector name="node-0">tcp://localhost:61616</connector>
+</connectors>
+<cluster-connections>
+<cluster-connection name="my-cluster">
+ <connector-ref>node-0</connector-ref>
+...
+```
+
+Backup broker config
+```xml
+<connectors>
+ <!-- Connector used to be announced through cluster connections and
notifications -->
+ <connector name="node-1">tcp://localhost:61617</connector>
+</connectors>
+<cluster-connections>
+<cluster-connection name="my-cluster">
+ <connector-ref>node-1</connector-ref>
+ ...
+```
+
+Client connection url
+```
+(tcp://localhost:61616?name=node-0,tcp://localhost:61617?name=node-1)?ha=true&reconnectAttempts=-1
+```
+
### Getting Notified of Connection Failure
JMS provides a standard mechanism for getting notified asynchronously of
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index a40083d402..0bfe403fc1 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -1957,13 +1957,14 @@ public abstract class ClusterTestBase extends
ActiveMQTestBase {
throw new IllegalStateException("No server at node " + nodeFrom);
}
- TransportConfiguration connectorFrom =
createTransportConfiguration(netty, false, generateParams(nodeFrom, netty));
- serverFrom.getConfiguration().getConnectorConfigurations().put(name,
connectorFrom);
+ String connectorName = "node" + nodeFrom;
+ TransportConfiguration connectorFrom =
createTransportConfiguration(connectorName, netty, false,
generateParams(nodeFrom, netty));
+
serverFrom.getConfiguration().getConnectorConfigurations().put(connectorName,
connectorFrom);
List<String> pairs = getClusterConnectionTCNames(netty, serverFrom,
nodesTo);
Configuration config = serverFrom.getConfiguration();
- ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(name).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
+ ClusterConnectionConfiguration clusterConf = new
ClusterConnectionConfiguration().setName(name).setAddress(address).setConnectorName(connectorName).setRetryInterval(250).setMessageLoadBalancingType(messageLoadBalancingType).setMaxHops(maxHops).setConfirmationWindowSize(1024).setStaticConnectors(pairs);
config.getClusterConfigurations().add(clusterConf);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
new file mode 100644
index 0000000000..7bdbea7a90
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQSession;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClientConnectorFailoverTest extends
StaticClusterWithBackupFailoverTest {
+
+ private static final String TEST_PARAM = "TEST";
+
+ @Override
+ protected boolean isNetty() {
+ return true;
+ }
+
+ @Test
+ public void testConsumerAfterFailover() throws Exception {
+ setupCluster();
+ startServers(getLiveServerIDs());
+ startServers(getBackupServerIDs());
+
+ for (int i : getLiveServerIDs()) {
+ waitForTopology(servers[i], 3, 3);
+ }
+
+ for (int i : getBackupServerIDs()) {
+ waitForFailoverTopology(i, 0, 1, 2);
+ }
+
+ for (int i : getLiveServerIDs()) {
+ setupSessionFactory(i, i + 3, isNetty(), false);
+ createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
+ }
+
+ List<TransportConfiguration> transportConfigList = new ArrayList<>();
+ for (int i : getServerIDs()) {
+ Map<String, Object> params = generateParams(i, isNetty());
+ TransportConfiguration serverToTC =
createTransportConfiguration("node" + i, isNetty(), false, params);
+ serverToTC.getExtraParams().put(TEST_PARAM, TEST_PARAM);
+ transportConfigList.add(serverToTC);
+ }
+ TransportConfiguration[] transportConfigs =
transportConfigList.toArray(new
TransportConfiguration[transportConfigList.size()]);
+
+ try (ServerLocator serverLocator = new ServerLocatorImpl(true,
transportConfigs)) {
+ serverLocator.setReconnectAttempts(-1);
+ try (ClientSessionFactory sessionFactory =
serverLocator.createSessionFactory()) {
+ try (ClientSession clientSession = sessionFactory.createSession())
{
+ clientSession.start();
+
+ TransportConfiguration backupConnector =
(TransportConfiguration)
+
((ClientSessionFactoryImpl)sessionFactory).getBackupConnector();
+ Assert.assertNotEquals(backupConnector.getName(),
sessionFactory.getConnectorConfiguration().getName());
+
+ int serverIdBeforeCrash = Integer.parseInt(sessionFactory.
+ getConnectorConfiguration().getName().substring(4));
+
+ try (ClientProducer clientProducer =
clientSession.createProducer(QUEUES_TESTADDRESS)) {
+ clientProducer.send(clientSession.createMessage(true));
+ }
+
+ crashAndWaitForFailure(getServer(serverIdBeforeCrash),
clientSession);
+ Assert.assertEquals(backupConnector.getName(),
sessionFactory.getConnectorConfiguration().getName());
+ Assert.assertEquals(TEST_PARAM,
sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM));
+
+ int serverIdAfterCrash = Integer.parseInt(sessionFactory.
+ getConnectorConfiguration().getName().substring(4));
+ Assert.assertNotEquals(serverIdBeforeCrash, serverIdAfterCrash);
+
+ try (ClientConsumer clientConsumer =
clientSession.createConsumer(QUEUE_NAME)) {
+ Assert.assertNotNull(clientConsumer.receive(3000));
+ }
+
+ QueueControl testQueueControlAfterCrash =
(QueueControl)getServer(serverIdAfterCrash).
+ getManagementService().getResource(ResourceNames.QUEUE +
QUEUE_NAME);
+ Wait.waitFor(() -> testQueueControlAfterCrash.getMessageCount()
== 0, 3000);
+
+ clientSession.stop();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testJMSConsumerAfterFailover() throws Exception {
+
+ setupCluster();
+ startServers(getLiveServerIDs());
+ startServers(getBackupServerIDs());
+
+ for (int i : getLiveServerIDs()) {
+ waitForTopology(servers[i], 3, 3);
+ }
+
+ for (int i : getBackupServerIDs()) {
+ waitForFailoverTopology(i, 0, 1, 2);
+ }
+
+ StringBuilder connectionURL = new StringBuilder();
+ connectionURL.append("(");
+ for (int i : getServerIDs()) {
+ connectionURL.append("tcp://localhost:");
+
connectionURL.append(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT
+ i);
+ connectionURL.append("?name=node");
+ connectionURL.append(i);
+ connectionURL.append("&");
+ connectionURL.append(TEST_PARAM);
+ connectionURL.append("=");
+ connectionURL.append(TEST_PARAM);
+ connectionURL.append(",");
+ }
+ connectionURL.replace(connectionURL.length() - 1,
connectionURL.length(), ")");
+ connectionURL.append( "?ha=true&reconnectAttempts=-1");
+
+ ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(connectionURL.toString());
+
+ try (Connection connection = connectionFactory.createConnection()) {
+ connection.start();
+ try (Session session = connection.createSession()) {
+ ClientSessionFactory sessionFactory =
((ActiveMQConnection)connection).getSessionFactory();
+ TransportConfiguration backupConnector = (TransportConfiguration)
+ ((ClientSessionFactoryImpl)sessionFactory).getBackupConnector();
+ Assert.assertNotEquals(backupConnector.getName(),
sessionFactory.getConnectorConfiguration().getName());
+
+ int serverIdBeforeCrash = Integer.parseInt(sessionFactory.
+ getConnectorConfiguration().getName().substring(4));
+
+ Queue testQueue = session.createQueue(QUEUE_NAME);
+
+ try (MessageProducer producer = session.createProducer(testQueue))
{
+ producer.send(session.createTextMessage(TEST_PARAM));
+ }
+
+ ClientSession clientSession =
((ActiveMQSession)session).getCoreSession();
+ crashAndWaitForFailure(getServer(serverIdBeforeCrash),
clientSession);
+ Assert.assertEquals(backupConnector.getName(),
sessionFactory.getConnectorConfiguration().getName());
+ Assert.assertEquals(TEST_PARAM,
sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM));
+
+ int serverIdAfterCrash = Integer.parseInt(sessionFactory.
+ getConnectorConfiguration().getName().substring(4));
+ Assert.assertNotEquals(serverIdBeforeCrash, serverIdAfterCrash);
+
+ try (MessageConsumer messageConsumer =
session.createConsumer(testQueue)) {
+ Assert.assertNotNull(messageConsumer.receive(3000));
+ }
+
+ QueueControl testQueueControlAfterCrash =
(QueueControl)getServer(serverIdAfterCrash).
+ getManagementService().getResource(ResourceNames.QUEUE +
QUEUE_NAME);
+ Wait.waitFor(() -> testQueueControlAfterCrash.getMessageCount() ==
0, 3000);
+ }
+ connection.stop();
+ }
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
index be5d841035..72c2f51f06 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
@@ -20,6 +20,18 @@ import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
public class StaticClusterWithBackupFailoverTest extends
ClusterWithBackupFailoverTestBase {
+ protected int[] getServerIDs() {
+ return new int[]{0, 1, 2, 3, 4, 5};
+ }
+
+ protected int[] getLiveServerIDs() {
+ return new int[]{0, 1, 2};
+ }
+
+ protected int[] getBackupServerIDs() {
+ return new int[]{3, 4, 5};
+ }
+
@Override
protected void setupCluster(final MessageLoadBalancingType
messageLoadBalancingType) throws Exception {
setupClusterConnectionWithBackups("cluster0", "queues",
messageLoadBalancingType, 1, isNetty(), 0, new int[]{1, 2});