This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new bd3c057559 ARTEMIS-4251 Support CORE client failover to other live 
servers
bd3c057559 is described below

commit bd3c057559030826e1c7fc97df9efd31a5753809
Author: Domenico Francesco Bruscino <brus...@apache.org>
AuthorDate: Sat Apr 22 16:47:17 2023 +0200

    ARTEMIS-4251 Support CORE client failover to other live servers
    
    Improve the CORE client failover connecting to other live servers when all
    reconnect attempts fails, i.e. in a cluster composed of 2 live servers,
    when the server to which the CORE client is connected goes down the CORE
    client should reconnect its sessions to the other liver broker.
---
 .../artemis/api/config/ServerLocatorConfig.java    |   2 +
 .../artemis/api/core/client/ActiveMQClient.java    |   2 +
 .../artemis/api/core/client/ServerLocator.java     |  15 ++
 .../core/client/impl/ClientSessionFactoryImpl.java | 139 ++++++++++++----
 .../core/client/impl/ServerLocatorImpl.java        |  26 ++-
 .../core/client/impl/ServerLocatorInternal.java    |   4 +
 docs/user-manual/en/SUMMARY.md                     |   2 +-
 docs/user-manual/en/client-failover.md             | 161 ++++++++++++++++++
 docs/user-manual/en/client-reconnection.md         | 107 ------------
 docs/user-manual/en/configuration-index.md         |   2 +-
 docs/user-manual/en/core-bridges.md                |   4 +-
 docs/user-manual/en/ha.md                          |  59 +------
 docs/user-manual/en/send-guarantees.md             |   2 +-
 .../failover/ClientConnectorFailoverTest.java      | 185 +++++++++++++++++++++
 .../integration/cluster/failover/FailoverTest.java |  37 ++---
 .../StaticClusterWithBackupFailoverTest.java       |  18 ++
 16 files changed, 540 insertions(+), 225 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
index 395277ab96..201ec8dd01 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ServerLocatorConfig.java
@@ -44,6 +44,7 @@ public class ServerLocatorConfig {
    public long maxRetryInterval = ActiveMQClient.DEFAULT_MAX_RETRY_INTERVAL;
    public int reconnectAttempts = ActiveMQClient.DEFAULT_RECONNECT_ATTEMPTS;
    public int initialConnectAttempts = ActiveMQClient.INITIAL_CONNECT_ATTEMPTS;
+   public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
    public int initialMessagePacketSize = 
ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
    public boolean cacheLargeMessagesClient = 
ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
    public boolean compressLargeMessage = 
ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
@@ -80,6 +81,7 @@ public class ServerLocatorConfig {
       maxRetryInterval = locator.maxRetryInterval;
       reconnectAttempts = locator.reconnectAttempts;
       initialConnectAttempts = locator.initialConnectAttempts;
+      failoverAttempts = locator.failoverAttempts;
       initialMessagePacketSize = locator.initialMessagePacketSize;
       useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
    }
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
index 5ca294df38..f4134efbe1 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ActiveMQClient.java
@@ -113,6 +113,8 @@ public final class ActiveMQClient {
 
    public static final int INITIAL_CONNECT_ATTEMPTS = 1;
 
+   public static final int DEFAULT_FAILOVER_ATTEMPTS = 0;
+
    @Deprecated
    public static final boolean DEFAULT_FAILOVER_ON_INITIAL_CONNECTION = false;
 
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
index 7cb94353ab..2602d3f767 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ServerLocator.java
@@ -653,6 +653,21 @@ public interface ServerLocator extends AutoCloseable {
     */
    int getInitialConnectAttempts();
 
+   /**
+    * Sets the maximum number of failover attempts to establish a connection 
to other live servers after a connection failure.
+    * <p>
+    * Value must be -1 (to retry infinitely), 0 (to never retry connection) or 
greater than 0.
+    *
+    * @param attempts maximum number of failover attempts after a connection 
failure
+    * @return this ServerLocator
+    */
+   ServerLocator setFailoverAttempts(int attempts);
+
+   /**
+    * @return the number of failover attempts after a connection failure.
+    */
+   int getFailoverAttempts();
+
    /**
     * Returns true if the client will automatically attempt to connect to the 
backup server if the initial
     * connection to the live server fails
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 9c4ecb9992..6f2fc28053 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -75,6 +75,7 @@ import 
org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
+import java.util.function.BiPredicate;
 
 public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, 
ClientConnectionLifeCycleListener {
 
@@ -132,6 +133,8 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
 
    private int reconnectAttempts;
 
+   private int failoverAttempts;
+
    private final Set<SessionFailureListener> listeners = new 
ConcurrentHashSet<>();
 
    private final Set<FailoverEventListener> failoverListeners = new 
ConcurrentHashSet<>();
@@ -239,6 +242,8 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
 
       this.reconnectAttempts = reconnectAttempts;
 
+      this.failoverAttempts = locatorConfig.failoverAttempts;
+
       this.scheduledThreadPool = scheduledThreadPool;
 
       this.threadPool = threadPool;
@@ -640,7 +645,7 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
          // failoverLock
          // until failover is complete
 
-         if (reconnectAttempts != 0) {
+         if (reconnectAttempts != 0 || failoverAttempts != 0) {
 
             if (clientProtocolManager.cleanupBeforeFailover(me)) {
 
@@ -673,33 +678,96 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
                   sessionsToFailover = new HashSet<>(sessions);
                }
 
+               // Notify sessions before failover.
                for (ClientSessionInternal session : sessionsToFailover) {
                   session.preHandleFailover(connection);
                }
 
-               boolean allSessionReconnected = false;
-               int failedReconnectSessionsCounter = 0;
-               do {
-                  allSessionReconnected = 
reconnectSessions(sessionsToFailover, oldConnection, reconnectAttempts, me);
-                  if (oldConnection != null) {
-                     oldConnection.destroy();
+
+               // Try to reconnect to the current connector pair.
+               // Before ARTEMIS-4251 ClientSessionFactoryImpl only tries to 
reconnect to the current connector pair.
+               int reconnectRetries = 0;
+               boolean sessionsReconnected = false;
+               BiPredicate<Boolean, Integer> reconnectRetryPredicate =
+                  (reconnected, retries) -> clientProtocolManager.isAlive() &&
+                     !reconnected && (reconnectAttempts == -1 || retries < 
reconnectAttempts);
+               while (reconnectRetryPredicate.test(sessionsReconnected, 
reconnectRetries)) {
+
+                  int remainingReconnectRetries = reconnectAttempts == -1 ? -1 
: reconnectAttempts - reconnectRetries;
+                  reconnectRetries += 
getConnectionWithRetry(remainingReconnectRetries, oldConnection);
+
+                  if (connection != null) {
+                     sessionsReconnected = 
reconnectSessions(sessionsToFailover, oldConnection, me);
+
+                     if (!sessionsReconnected) {
+                        if (oldConnection != null) {
+                           oldConnection.destroy();
+                        }
+
+                        oldConnection = connection;
+                        connection = null;
+                     }
+                  }
+
+                  reconnectRetries++;
+                  if (reconnectRetryPredicate.test(sessionsReconnected, 
reconnectRetries)) {
+                     waitForRetry(retryInterval);
+                  }
+               }
+
+
+               // Try to connect to other connector pairs.
+               // After ARTEMIS-4251 ClientSessionFactoryImpl tries to connect 
to
+               // other connector pairs when reconnection to the current 
connector pair fails.
+               int connectorsCount = 0;
+               int failoverRetries = 0;
+               long failoverRetryInterval = retryInterval;
+               Pair<TransportConfiguration, TransportConfiguration> 
connectorPair;
+               BiPredicate<Boolean, Integer> failoverRetryPredicate =
+                  (reconnected, retries) -> clientProtocolManager.isAlive() &&
+                     !reconnected && (failoverAttempts == -1 || retries < 
failoverAttempts);
+               while (failoverRetryPredicate.test(sessionsReconnected, 
failoverRetries)) {
+
+                  connectorsCount++;
+                  connectorPair = serverLocator.selectNextConnectorPair();
+
+                  if (connectorPair != null) {
+                     connectorConfig = connectorPair.getA();
+                     currentConnectorConfig = connectorPair.getA();
+                     if (connectorPair.getB() != null) {
+                        backupConnectorConfig = connectorPair.getB();
+                     }
+
+                     getConnection();
                   }
 
-                  if (!allSessionReconnected) {
-                     failedReconnectSessionsCounter++;
-                     oldConnection = connection;
-                     connection = null;
+                  if (connection != null) {
+                     sessionsReconnected = 
reconnectSessions(sessionsToFailover, oldConnection, me);
 
-                     // Wait for retry when the connection is established but 
not all session are reconnected.
-                     if ((reconnectAttempts == -1 || 
failedReconnectSessionsCounter < reconnectAttempts) && oldConnection != null) {
-                        waitForRetry(retryInterval);
+                     if (!sessionsReconnected) {
+                        if (oldConnection != null) {
+                           oldConnection.destroy();
+                        }
+
+                        oldConnection = connection;
+                        connection = null;
+                     }
+                  }
+
+                  if (connectorsCount >= serverLocator.getConnectorsSize()) {
+                     connectorsCount = 0;
+                     failoverRetries++;
+                     if (failoverRetryPredicate.test(false, failoverRetries)) {
+                        waitForRetry(failoverRetryInterval);
+                        failoverRetryInterval = 
getNextRetryInterval(failoverRetryInterval);
                      }
                   }
                }
-               while ((reconnectAttempts == -1 || 
failedReconnectSessionsCounter < reconnectAttempts) && !allSessionReconnected);
 
+
+               // Notify sessions after failover.
                for (ClientSessionInternal session : sessionsToFailover) {
-                  session.postHandleFailover(connection, 
allSessionReconnected);
+                  session.postHandleFailover(connection, sessionsReconnected);
                }
 
                if (oldConnection != null) {
@@ -830,15 +898,12 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
     */
    private boolean reconnectSessions(final Set<ClientSessionInternal> 
sessionsToFailover,
                                      final RemotingConnection oldConnection,
-                                     final int reconnectAttempts,
                                      final ActiveMQException cause) {
-      getConnectionWithRetry(reconnectAttempts, oldConnection);
-
       if (connection == null) {
          if (!clientProtocolManager.isAlive())
             ActiveMQClientLogger.LOGGER.failedToConnectToServer();
 
-         return true;
+         return false;
       }
 
       List<FailureListener> oldListeners = oldConnection.getFailureListeners();
@@ -874,9 +939,9 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
       return !sessionFailoverError;
    }
 
-   private void getConnectionWithRetry(final int reconnectAttempts, 
RemotingConnection oldConnection) {
+   private int getConnectionWithRetry(final int reconnectAttempts, 
RemotingConnection oldConnection) {
       if (!clientProtocolManager.isAlive())
-         return;
+         return 0;
       if (logger.isTraceEnabled()) {
          logger.trace("getConnectionWithRetry::{} with retryInterval = {} 
multiplier = {}",
                       reconnectAttempts, retryInterval, 
retryIntervalMultiplier, new Exception("trace"));
@@ -897,7 +962,7 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
                
((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());
             }
             logger.debug("Reconnection successful");
-            return;
+            return count;
          } else {
             // Failed to get connection
 
@@ -909,7 +974,7 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
                      
ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);
                   }
 
-                  return;
+                  return count;
                }
 
                if (logger.isTraceEnabled()) {
@@ -917,22 +982,28 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
                }
 
                if (waitForRetry(interval))
-                  return;
+                  return count;
 
-               // Exponential back-off
-               long newInterval = (long) (interval * retryIntervalMultiplier);
-
-               if (newInterval > maxRetryInterval) {
-                  newInterval = maxRetryInterval;
-               }
-
-               interval = newInterval;
+               interval = getNextRetryInterval(interval);
             } else {
                logger.debug("Could not connect to any server. Didn't have 
reconnection configured on the ClientSessionFactory");
-               return;
+               return count;
             }
          }
       }
+
+      return count;
+   }
+
+   private long getNextRetryInterval(long retryInterval) {
+      // Exponential back-off
+      long nextRetryInterval = (long) (retryInterval * 
retryIntervalMultiplier);
+
+      if (nextRetryInterval > maxRetryInterval) {
+         nextRetryInterval = maxRetryInterval;
+      }
+
+      return nextRetryInterval;
    }
 
    @Override
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index 3dbc35468a..3fab8940fd 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -440,6 +440,15 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
       clusterTransportConfiguration = locator.clusterTransportConfiguration;
    }
 
+   private boolean useInitConnector() {
+      return !config.useTopologyForLoadBalancing || !receivedTopology || 
topologyArray == null || topologyArray.length == 0;
+   }
+
+   @Override
+   public Pair<TransportConfiguration, TransportConfiguration> 
selectNextConnectorPair() {
+      return selectConnector(useInitConnector());
+   }
+
    private synchronized Pair<TransportConfiguration, TransportConfiguration> 
selectConnector(boolean useInitConnector) {
       Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
 
@@ -470,7 +479,8 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
       }
    }
 
-   private int getConnectorsSize() {
+   @Override
+   public int getConnectorsSize() {
       Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
 
       flushTopology();
@@ -673,7 +683,7 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
          int attempts = 0;
          boolean topologyArrayTried = !config.useTopologyForLoadBalancing || 
topologyArray == null || topologyArray.length == 0;
          boolean staticTried = false;
-         boolean shouldTryStatic = !config.useTopologyForLoadBalancing || 
!receivedTopology || topologyArray == null || topologyArray.length == 0;
+         boolean shouldTryStatic = useInitConnector();
 
          while (retry && !isClosed()) {
             retry = false;
@@ -1177,6 +1187,18 @@ public final class ServerLocatorImpl implements 
ServerLocatorInternal, Discovery
       return config.initialConnectAttempts;
    }
 
+   @Override
+   public ServerLocatorImpl setFailoverAttempts(int attempts) {
+      checkWrite();
+      this.config.failoverAttempts = attempts;
+      return this;
+   }
+
+   @Override
+   public int getFailoverAttempts() {
+      return config.failoverAttempts;
+   }
+
    @Deprecated
    @Override
    public boolean isFailoverOnInitialConnection() {
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
index 8219833703..c0306c1754 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorInternal.java
@@ -87,4 +87,8 @@ public interface ServerLocatorInternal extends ServerLocator {
    ClientProtocolManager newProtocolManager();
 
    boolean isConnectable();
+
+   int getConnectorsSize();
+
+   Pair<TransportConfiguration, TransportConfiguration> 
selectNextConnectorPair();
 }
diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md
index 80b26d7aa7..7fadf46a19 100644
--- a/docs/user-manual/en/SUMMARY.md
+++ b/docs/user-manual/en/SUMMARY.md
@@ -59,7 +59,7 @@
 * [Broker Plugins](broker-plugins.md)
 * [Resource Limits](resource-limits.md)
 * [The JMS Bridge](jms-bridge.md)
-* [Client Reconnection and Session Reattachment](client-reconnection.md)
+* [Client Failover](client-failover.md)
 * [Diverting and Splitting Message Flows](diverts.md)
 * [Core Bridges](core-bridges.md)
 * [Transformers](transformers.md)
diff --git a/docs/user-manual/en/client-failover.md 
b/docs/user-manual/en/client-failover.md
new file mode 100644
index 0000000000..19b1e7fc9a
--- /dev/null
+++ b/docs/user-manual/en/client-failover.md
@@ -0,0 +1,161 @@
+# Client Failover
+
+Apache ActiveMQ Artemis clients can be configured to automatically
+[reconnect to the same server](#reconnect-to-the-same-server),
+[reconnect to the backup server](#reconnect-to-the-backup-server) or
+[reconnect to other live servers](#reconnect-to-other-live-servers) in the 
event
+that a failure is detected in the connection between the client and the server.
+The clients detect connection failure when they have not received any packets
+from the server within the time given by `client-failure-check-period` as 
explained
+in section [Detecting Dead Connections](connection-ttl.md).
+
+## Reconnect to the same server
+Set `reconnectAttempts` to any non-zero value to reconnect to the same server,
+for further details see
+[Reconnection and failover attributes](#client-failover-attributes).
+
+If the disconnection was due to some transient failure such as a temporary
+network outage and the target server was not restarted, then the sessions will
+still exist on the server, assuming the client hasn't been disconnected for
+more than [connection-ttl](connection-ttl.md)
+
+In this scenario, the client sessions will be automatically re-attached to the
+server sessions after the reconnection. This is done 100% transparently and the
+client can continue exactly as if nothing had happened.
+
+The way this works is as follows:
+
+As Apache ActiveMQ Artemis clients send commands to their servers they store
+each sent command in an in-memory buffer. In the case that connection failure
+occurs and the client subsequently reattaches to the same server, as part of
+the reattachment protocol the server informs the client during reattachment
+with the id of the last command it successfully received from that client.
+
+If the client has sent more commands than were received before failover it can
+replay any sent commands from its buffer so that the client and server can
+reconcile their states.Ac
+
+The size of this buffer is configured with the `confirmationWindowSize`
+parameter on the connection URL. When the server has received
+`confirmationWindowSize` bytes of commands and processed them it will send back
+a command confirmation to the client, and the client can then free up space in
+the buffer.
+
+The window is specified in bytes.
+
+Setting this parameter to `-1` disables any buffering and prevents any
+re-attachment from occurring, forcing reconnect instead. The default value for
+this parameter is `-1`. (Which means by default no auto re-attachment will
+occur)
+
+## Reconnect to the backup server
+Set `reconnectAttempts` to any non-zero value and `ha` to `true` to reconnect
+to the back server, for further details see
+[Reconnection and failover attributes](#client-failover-attributes).
+
+The clients can be configured to discover the list of live-backup
+server groups in a number of different ways. They can be configured
+explicitly or probably the most common way of doing this is to use
+*server discovery* for the client to automatically discover the list.
+For full details on how to configure server discovery, please see 
[Clusters](clusters.md).
+Alternatively, the clients can explicitly connect to a specific server
+and download the current servers and backups see [Clusters](clusters.md).
+
+By default, failover will only occur after at least one connection has
+been made to the live server. In other words, by default, failover will
+not occur if the client fails to make an initial connection to the live
+server - in this case it will simply retry connecting to the live server
+according to the reconnect-attempts property and fail after this number
+of attempts.
+
+## Reconnect to other live servers
+Set `failoverAttempts` to any non-zero value to reconnect to other live 
servers,
+for further details see
+[Reconnection and failover attributes](#client-failover-attributes).
+
+If `reconnectAttempts` value is not zero then the client will try to reconnect
+to other live servers only after all attempts to
+[reconnect to the same server](#reconnect-to-the-same-server) or
+[reconnect to the backup server](#reconnect-to-the-backup-server) fail.
+
+## Session reconnection
+
+When clients [reconnect to the same server](#reconnect-to-the-same-server)
+after a restart, [reconnect to the backup 
server](#reconnect-to-the-backup-server)
+or [reconnect to other live servers](#reconnect-to-other-live-servers) any 
sessions
+will no longer exist on the server and it won't be possible to 100% 
transparently
+re-attach to them. In this case, any sessions and consumers on the client will 
be
+automatically recreated on the server.
+
+Client reconnection is also used internally by components such as core bridges
+to allow them to reconnect to their target servers.
+
+## Failing over on the initial connection
+
+Since the client does not learn about the full topology until after the
+first connection is made there is a window where it does not know about
+the backup. If a failure happens at this point the client can only try
+reconnecting to the original live server. To configure how many attempts
+the client will make you can set the URL parameter `initialConnectAttempts`.
+The default for this is `0`, that is try only once. Once the number of
+attempts has been made an exception will be thrown.
+
+For examples of automatic failover with transacted and non-transacted
+JMS sessions, please see [the examples](examples.md) chapter.
+
+## Reconnection and failover attributes
+
+Client reconnection and failover is configured using the following parameters:
+
+- `retryInterval`. This optional parameter determines the period in
+  milliseconds between subsequent reconnection attempts, if the connection to
+  the target server has failed. The default value is `2000` milliseconds.
+
+- `retryIntervalMultiplier`. This optional parameter determines a multiplier
+  to apply to the time since the last retry to compute the time to the next
+  retry.
+
+  This allows you to implement an *exponential backoff* between retry attempts.
+
+  Let's take an example:
+
+  If we set `retryInterval` to `1000` ms and we set `retryIntervalMultiplier`
+  to `2.0`, then, if the first reconnect attempt fails, we will wait `1000` ms
+  then `2000` ms then `4000` ms between subsequent reconnection attempts.
+
+  The default value is `1.0` meaning each reconnect attempt is spaced at equal
+  intervals.
+
+- `maxRetryInterval`. This optional parameter determines the maximum retry
+  interval that will be used. When setting `retryIntervalMultiplier` it would
+  otherwise be possible that subsequent retries exponentially increase to
+  ridiculously large values. By setting this parameter you can set an upper 
limit
+  on that value. The default value is `2000` milliseconds.
+
+- `ha`. This optional parameter determines weather the client will try to
+  reconnect to the backup node when the live node is not reachable.
+  The default value is `false`.
+  For more information on HA, please see [High Availability and 
Failover](ha.md).
+
+- `reconnectAttempts`. This optional parameter determines the total number of
+  reconnect attempts to make to the current live/backup pair before giving up.
+  A value of `-1` signifies an unlimited number of attempts.
+  The default value is `0`.
+
+- `failoverAttempts`. This optional parameter determines the total number of
+  failover attempts to make after a reconnection failure before giving up and
+  shutting down. A value of `-1` signifies an unlimited number of attempts.
+  The default value is `0`.
+
+All of these parameters are set on the URL used to connect to the broker.
+
+If your client does manage to reconnect but the session is no longer available
+on the server, for instance if the server has been restarted or it has timed
+out, then the client won't be able to re-attach, and any `ExceptionListener` or
+`FailureListener` instances registered on the connection or session will be
+called.
+
+## ExceptionListeners and SessionFailureListeners
+
+Please note, that when a client reconnects or re-attaches, any registered JMS
+`ExceptionListener` or core API `SessionFailureListener` will be called.
diff --git a/docs/user-manual/en/client-reconnection.md 
b/docs/user-manual/en/client-reconnection.md
deleted file mode 100644
index 8f87877db1..0000000000
--- a/docs/user-manual/en/client-reconnection.md
+++ /dev/null
@@ -1,107 +0,0 @@
-# Client Reconnection and Session Reattachment
-
-Apache ActiveMQ Artemis clients can be configured to automatically reconnect or
-re-attach to the server in the event that a failure is detected in the
-connection between the client and the server.
-
-## 100% Transparent session re-attachment
-
-If the disconnection was due to some transient failure such as a temporary
-network outage and the target server was not restarted, then the sessions will
-still exist on the server, assuming the client hasn't been disconnected for
-more than [connection-ttl](connection-ttl.md)
-
-In this scenario, Apache ActiveMQ Artemis will automatically re-attach the
-client sessions to the server sessions when the connection reconnects. This is
-done 100% transparently and the client can continue exactly as if nothing had
-happened.
-
-The way this works is as follows:
-
-As Apache ActiveMQ Artemis clients send commands to their servers they store
-each sent command in an in-memory buffer. In the case that connection failure
-occurs and the client subsequently reattaches to the same server, as part of
-the reattachment protocol the server informs the client during reattachment
-with the id of the last command it successfully received from that client.
-
-If the client has sent more commands than were received before failover it can
-replay any sent commands from its buffer so that the client and server can
-reconcile their states.Ac
-
-The size of this buffer is configured with the `confirmationWindowSize`
-parameter on the connection URL. When the server has received
-`confirmationWindowSize` bytes of commands and processed them it will send back
-a command confirmation to the client, and the client can then free up space in
-the buffer.
-
-The window is specified in bytes.
-
-Setting this parameter to `-1` disables any buffering and prevents any
-re-attachment from occurring, forcing reconnect instead. The default value for
-this parameter is `-1`. (Which means by default no auto re-attachment will
-occur)
-
-## Session reconnection
-
-Alternatively, the server might have actually been restarted after crashing or
-being stopped. In this case any sessions will no longer exist on the server and
-it won't be possible to 100% transparently re-attach to them.
-
-In this case, the Apache ActiveMQ Artemis client will automatically reconnect
-and *recreate* any sessions and consumers on the server corresponding to the
-sessions and consumers on the client. This process is exactly the same as what
-happens during failover onto a backup server.
-
-Client reconnection is also used internally by components such as core bridges
-to allow them to reconnect to their target servers.
-
-Please see the section on failover [Automatic Client Failover](ha.md) to get a
-full understanding of how transacted and non-transacted sessions are
-reconnected during failover/reconnect and what you need to do to maintain *once
-and only once* delivery guarantees.
-
-## Configuring reconnection/reattachment attributes
-
-Client reconnection is configured using the following parameters:
-
-- `retryInterval`. This optional parameter determines the period in
-  milliseconds between subsequent reconnection attempts, if the connection to
-  the target server has failed. The default value is `2000` milliseconds.
-
-- `retryIntervalMultiplier`. This optional parameter determines a multiplier
-  to apply to the time since the last retry to compute the time to the next
-  retry.
-
-  This allows you to implement an *exponential backoff* between retry attempts.
-
-  Let's take an example:
-
-  If we set `retryInterval` to `1000` ms and we set `retryIntervalMultiplier`
-  to `2.0`, then, if the first reconnect attempt fails, we will wait `1000` ms
-  then `2000` ms then `4000` ms between subsequent reconnection attempts.
-
-  The default value is `1.0` meaning each reconnect attempt is spaced at equal
-  intervals.
-
-- `maxRetryInterval`. This optional parameter determines the maximum retry
-  interval that will be used. When setting `retryIntervalMultiplier` it would
-  otherwise be possible that subsequent retries exponentially increase to
-  ridiculously large values. By setting this parameter you can set an upper 
limit
-  on that value. The default value is `2000` milliseconds.
-
-- `reconnectAttempts`. This optional parameter determines the total number of
-  reconnect attempts to make before giving up and shutting down. A value of
-  `-1` signifies an unlimited number of attempts. The default value is `0`.
-
-All of these parameters are set on the URL used to connect to the broker.
-
-If your client does manage to reconnect but the session is no longer available
-on the server, for instance if the server has been restarted or it has timed
-out, then the client won't be able to re-attach, and any `ExceptionListener` or
-`FailureListener` instances registered on the connection or session will be
-called.
-
-## ExceptionListeners and SessionFailureListeners
-
-Please note, that when a client reconnects or re-attaches, any registered JMS
-`ExceptionListener` or core API `SessionFailureListener` will be called.
diff --git a/docs/user-manual/en/configuration-index.md 
b/docs/user-manual/en/configuration-index.md
index 55818d3940..4891634eae 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -362,7 +362,7 @@ Name | Description | Default
 [use-duplicate-detection](clusters.md)| should duplicate detection headers be 
inserted in forwarded messages? | `true`
 [message-load-balancing](clusters.md) | how should messages be load balanced? 
| `OFF`
 [max-hops](clusters.md)| maximum number of hops cluster topology is 
propagated. | 1
-[confirmation-window-size](client-reconnection.md#client-reconnection-and-session-reattachment)|
 The size (in bytes) of the window used for confirming data from the server 
connected to. | 1048576
+[confirmation-window-size](client-failover.md#reconnect-to-the-same-server)| 
The size (in bytes) of the window used for confirming data from the server 
connected to. | 1048576
 [producer-window-size](clusters.md)| Flow Control for the Cluster connection 
bridge. | -1 (disabled)
 [call-failover-timeout](clusters.md#configuring-cluster-connections)| How long 
to wait for a reply if in the middle of a fail-over. -1 means wait forever. | -1
 [notification-interval](clusters.md) | how often the cluster connection will 
notify the cluster of its existence right after joining the cluster. | 1000
diff --git a/docs/user-manual/en/core-bridges.md 
b/docs/user-manual/en/core-bridges.md
index 71da1fcecf..e7653c9bd0 100644
--- a/docs/user-manual/en/core-bridges.md
+++ b/docs/user-manual/en/core-bridges.md
@@ -151,8 +151,8 @@ Let's take a look at all the parameters in turn:
 
 - `confirmation-window-size`. This optional parameter determines the
   `confirmation-window-size` to use for the connection used to forward messages
-  to the target node. This attribute is described in section [Reconnection and
-  Session Reattachment](client-reconnection.md)
+  to the target node. This attribute is described in section
+  [Client failover attributes](client-failover.md#client-failover-attributes)
 
   > **Warning**
   >
diff --git a/docs/user-manual/en/ha.md b/docs/user-manual/en/ha.md
index 03f26fa30d..193a2b76af 100644
--- a/docs/user-manual/en/ha.md
+++ b/docs/user-manual/en/ha.md
@@ -957,24 +957,7 @@ transactions are there for the client when it reconnects. 
The normal
 reconnect settings apply when the client is reconnecting so these should
 be high enough to deal with the time needed to scale down.
 
-## Failover Modes
-
-Apache ActiveMQ Artemis defines two types of client failover:
-
-- Automatic client failover
-
-- Application-level client failover
-
-Apache ActiveMQ Artemis also provides 100% transparent automatic reattachment 
of
-connections to the same server (e.g. in case of transient network
-problems). This is similar to failover, except it is reconnecting to the
-same server and is discussed in [Client Reconnection and Session 
Reattachment](client-reconnection.md)
-
-During failover, if the client has consumers on any non persistent or
-temporary queues, those queues will be automatically recreated on the backup 
node, 
-since the backup node will not have any knowledge of non persistent queues.
-
-### Automatic Client Failover
+## Client Failover
 
 Apache ActiveMQ Artemis clients can be configured to receive knowledge of all 
live and
 backup servers, so that in event of connection failure at the client -
@@ -982,45 +965,7 @@ live server connection, the client will detect this and 
reconnect to the
 backup server. The backup server will then automatically recreate any
 sessions and consumers that existed on each connection before failover,
 thus saving the user from having to hand-code manual reconnection logic.
-
-Apache ActiveMQ Artemis clients detect connection failure when it has not 
received
-packets from the server within the time given by
-`client-failure-check-period` as explained in section [Detecting Dead 
Connections](connection-ttl.md). If the client
-does not receive data in good time, it will assume the connection has
-failed and attempt failover. Also if the socket is closed by the OS,
-usually if the server process is killed rather than the machine itself
-crashing, then the client will failover straight away.
-
-Apache ActiveMQ Artemis clients can be configured to discover the list of 
live-backup
-server groups in a number of different ways. They can be configured
-explicitly or probably the most common way of doing this is to use
-*server discovery* for the client to automatically discover the list.
-For full details on how to configure server discovery, please see 
[Clusters](clusters.md).
-Alternatively, the clients can explicitly connect to a specific server
-and download the current servers and backups see [Clusters](clusters.md).
-
-To enable automatic client failover, the client must be configured to
-allow non-zero reconnection attempts (as explained in [Client Reconnection and 
Session Reattachment](client-reconnection.md)).
-
-By default failover will only occur after at least one connection has
-been made to the live server. In other words, by default, failover will
-not occur if the client fails to make an initial connection to the live
-server - in this case it will simply retry connecting to the live server
-according to the reconnect-attempts property and fail after this number
-of attempts.
-
-#### Failing over on the Initial Connection
-
-Since the client does not learn about the full topology until after the
-first connection is made there is a window where it does not know about
-the backup. If a failure happens at this point the client can only try
-reconnecting to the original live server. To configure how many attempts
-the client will make you can set the URL parameter `initialConnectAttempts`.
-The default for this is `0`, that is try only once. Once the number of
-attempts has been made an exception will be thrown.
-
-For examples of automatic failover with transacted and non-transacted
-JMS sessions, please see [the examples](examples.md) chapter.
+For further details see [Client Failover](client-failover.md)
 
 #### A Note on Server Replication
 
diff --git a/docs/user-manual/en/send-guarantees.md 
b/docs/user-manual/en/send-guarantees.md
index da631e47ef..b9b75b41f8 100644
--- a/docs/user-manual/en/send-guarantees.md
+++ b/docs/user-manual/en/send-guarantees.md
@@ -112,7 +112,7 @@ successfully reached the server.
 
 The window size for send acknowledgements is determined by the
 confirmation-window-size parameter on the connection factory or client
-session factory. Please see [Client Reconnection and Session 
Reattachment](client-reconnection.md) for more info on this.
+session factory. Please see [Client Failover](client-failover.md) for more 
info on this.
 
 To use the feature using the core API, you implement the interface
 `org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler` and 
set
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
index 7bdbea7a90..e86998fe08 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ClientConnectorFailoverTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
@@ -36,6 +37,7 @@ import 
org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.jms.client.ActiveMQSession;
@@ -98,6 +100,7 @@ public class ClientConnectorFailoverTest extends 
StaticClusterWithBackupFailover
                }
 
                crashAndWaitForFailure(getServer(serverIdBeforeCrash), 
clientSession);
+
                Assert.assertEquals(backupConnector.getName(), 
sessionFactory.getConnectorConfiguration().getName());
                Assert.assertEquals(TEST_PARAM, 
sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM));
 
@@ -119,6 +122,188 @@ public class ClientConnectorFailoverTest extends 
StaticClusterWithBackupFailover
       }
    }
 
+   @Test
+   public void testConsumerAfterFailoverWithRedistribution() throws Exception {
+      setupCluster();
+
+      AddressSettings testAddressSettings = new 
AddressSettings().setRedistributionDelay(0);
+      for (int i : getServerIDs()) {
+         
getServer(i).getAddressSettingsRepository().addMatch(QUEUES_TESTADDRESS, 
testAddressSettings);
+      }
+
+      startServers(getLiveServerIDs());
+      startServers(getBackupServerIDs());
+
+      for (int i : getLiveServerIDs()) {
+         waitForTopology(servers[i], 3, 3);
+      }
+
+      for (int i : getBackupServerIDs()) {
+         waitForFailoverTopology(i, 0, 1, 2);
+      }
+
+      for (int i : getLiveServerIDs()) {
+         setupSessionFactory(i, i + 3, isNetty(), false);
+         createQueue(i, QUEUES_TESTADDRESS, QUEUE_NAME, null, true);
+      }
+
+      List<TransportConfiguration> transportConfigList = new ArrayList<>();
+      for (int i : getLiveServerIDs()) {
+         Map<String, Object> params = generateParams(i, isNetty());
+         TransportConfiguration serverToTC = 
createTransportConfiguration("node" + i, isNetty(), false, params);
+         serverToTC.getExtraParams().put(TEST_PARAM, TEST_PARAM);
+         transportConfigList.add(serverToTC);
+      }
+      TransportConfiguration[] transportConfigs = 
transportConfigList.toArray(new 
TransportConfiguration[transportConfigList.size()]);
+
+      try (ServerLocator serverLocator = new ServerLocatorImpl(false, 
transportConfigs)) {
+         serverLocator.setFailoverAttempts(3);
+         serverLocator.setReconnectAttempts(0);
+         serverLocator.setUseTopologyForLoadBalancing(false);
+
+         try (ClientSessionFactory sessionFactory = 
serverLocator.createSessionFactory()) {
+            try (ClientSession clientSession = sessionFactory.createSession()) 
{
+               clientSession.start();
+
+               int serverIdBeforeCrash = Integer.parseInt(sessionFactory.
+                  getConnectorConfiguration().getName().substring(4));
+
+               QueueControl testQueueControlBeforeCrash = 
(QueueControl)getServer(serverIdBeforeCrash).
+                  getManagementService().getResource(ResourceNames.QUEUE + 
QUEUE_NAME);
+
+               Assert.assertEquals(0, 
testQueueControlBeforeCrash.getMessageCount());
+
+               try (ClientProducer clientProducer = 
clientSession.createProducer(QUEUES_TESTADDRESS)) {
+                  clientProducer.send(clientSession.createMessage(true));
+                  clientProducer.send(clientSession.createMessage(true));
+               }
+
+               Assert.assertEquals(2, 
testQueueControlBeforeCrash.getMessageCount());
+
+               try (ClientConsumer clientConsumer = 
clientSession.createConsumer(QUEUE_NAME)) {
+                  ClientMessage messageBeforeCrash = 
clientConsumer.receive(3000);
+                  Assert.assertNotNull(messageBeforeCrash);
+                  messageBeforeCrash.acknowledge();
+                  clientSession.commit();
+
+                  Assert.assertEquals(1, 
testQueueControlBeforeCrash.getMessageCount());
+
+                  crashAndWaitForFailure(getServer(serverIdBeforeCrash), 
clientSession);
+
+                  Assert.assertEquals(TEST_PARAM, 
sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM));
+
+                  int serverIdAfterCrash = Integer.parseInt(sessionFactory.
+                     getConnectorConfiguration().getName().substring(4));
+                  Assert.assertNotEquals(serverIdBeforeCrash, 
serverIdAfterCrash);
+
+                  Assert.assertTrue(isLiveServerID(serverIdAfterCrash));
+
+                  QueueControl testQueueControlAfterCrash = 
(QueueControl)getServer(serverIdAfterCrash).
+                     getManagementService().getResource(ResourceNames.QUEUE + 
QUEUE_NAME);
+
+                  Wait.waitFor(() -> 
testQueueControlAfterCrash.getMessageCount() == 1, 3000);
+
+                  Assert.assertNotNull(clientConsumer.receive());
+               }
+
+               clientSession.stop();
+            }
+         }
+      }
+   }
+
+   @Test
+   public void testAutoCreatedQueueAfterFailoverWithoutHA() throws Exception {
+      setupCluster();
+
+      startServers(getLiveServerIDs());
+
+      for (int i : getLiveServerIDs()) {
+         waitForTopology(servers[i], 3, 0);
+      }
+
+      for (int i : getLiveServerIDs()) {
+         setupSessionFactory(i, i + 3, isNetty(), false);
+      }
+
+      List<TransportConfiguration> transportConfigList = new ArrayList<>();
+      for (int i : getLiveServerIDs()) {
+         Map<String, Object> params = generateParams(i, isNetty());
+         TransportConfiguration serverToTC = 
createTransportConfiguration("node" + i, isNetty(), false, params);
+         serverToTC.getExtraParams().put(TEST_PARAM, TEST_PARAM);
+         transportConfigList.add(serverToTC);
+      }
+      TransportConfiguration[] transportConfigs = 
transportConfigList.toArray(new 
TransportConfiguration[transportConfigList.size()]);
+
+      try (ServerLocator serverLocator = new ServerLocatorImpl(false, 
transportConfigs)) {
+         serverLocator.setFailoverAttempts(3);
+         serverLocator.setReconnectAttempts(0);
+         serverLocator.setUseTopologyForLoadBalancing(false);
+
+         try (ClientSessionFactory sessionFactory = 
serverLocator.createSessionFactory()) {
+            try (ClientSession clientSession = sessionFactory.createSession()) 
{
+               clientSession.start();
+
+               TransportConfiguration backupConnector = 
(TransportConfiguration) ((ClientSessionFactoryImpl) 
sessionFactory).getBackupConnector();
+               Assert.assertNull(backupConnector);
+
+               int serverIdBeforeCrash = 
Integer.parseInt(sessionFactory.getConnectorConfiguration().getName().substring(4));
+
+               createQueue(serverIdBeforeCrash, QUEUES_TESTADDRESS, 
QUEUE_NAME, null, false);
+
+               QueueControl testQueueControlBeforeCrash = (QueueControl) 
getServer(serverIdBeforeCrash).getManagementService().getResource(ResourceNames.QUEUE
 + QUEUE_NAME);
+               Assert.assertEquals(0, 
testQueueControlBeforeCrash.getMessageCount());
+
+               for (int i : getLiveServerIDs()) {
+                  if (i != serverIdBeforeCrash) {
+                     
Assert.assertNull(getServer(i).getManagementService().getResource(ResourceNames.QUEUE
 + QUEUE_NAME));
+                  }
+               }
+
+               try (ClientConsumer clientConsumer = 
clientSession.createConsumer(QUEUE_NAME)) {
+                  try (ClientProducer clientProducer = 
clientSession.createProducer(QUEUES_TESTADDRESS)) {
+                     clientProducer.send(clientSession.createMessage(true));
+                  }
+
+                  Wait.waitFor(() -> 
testQueueControlBeforeCrash.getMessageCount() == 1, 3000);
+
+                  Assert.assertNotNull(clientConsumer.receive(3000));
+
+                  crashAndWaitForFailure(getServer(serverIdBeforeCrash), 
clientSession);
+
+                  Assert.assertEquals(TEST_PARAM, 
sessionFactory.getConnectorConfiguration().getExtraParams().get(TEST_PARAM));
+
+                  int serverIdAfterCrash = 
Integer.parseInt(sessionFactory.getConnectorConfiguration().getName().substring(4));
+                  Assert.assertNotEquals(serverIdBeforeCrash, 
serverIdAfterCrash);
+
+                  boolean serverIdAfterCrashFound = false;
+                  for (int i : getLiveServerIDs()) {
+                     if (i == serverIdAfterCrash) {
+                        serverIdAfterCrashFound = true;
+                     }
+                  }
+                  Assert.assertTrue(serverIdAfterCrashFound);
+
+                  QueueControl testQueueControlAfterCrash = (QueueControl) 
getServer(serverIdAfterCrash).getManagementService().getResource(ResourceNames.QUEUE
 + QUEUE_NAME);
+                  Assert.assertNotNull(testQueueControlAfterCrash);
+                  Assert.assertEquals(0, 
testQueueControlAfterCrash.getMessageCount());
+
+                  try (ClientProducer clientProducer = 
clientSession.createProducer(QUEUES_TESTADDRESS)) {
+                     clientProducer.send(clientSession.createMessage(true));
+
+                     Wait.waitFor(() -> 
testQueueControlAfterCrash.getMessageCount() == 1, 3000);
+                     Assert.assertEquals(1, 
testQueueControlAfterCrash.getMessageCount());
+
+                     Assert.assertNotNull(clientConsumer.receive(3000));
+                  }
+
+                  clientSession.stop();
+               }
+            }
+         }
+      }
+   }
+
    @Test
    public void testJMSConsumerAfterFailover() throws Exception {
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index ea4945e5c9..4228e3e65e 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -92,6 +92,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class FailoverTest extends FailoverTestBase {
 
@@ -1991,36 +1992,32 @@ public class FailoverTest extends FailoverTestBase {
 
       sf = createSessionFactoryAndWaitForTopology(locator, 2);
 
-      final AtomicBoolean channelLockedDuringFailover = new 
AtomicBoolean(false);
+      final int reconnectFailures = 3;
+      final AtomicInteger reconnectRetries = new AtomicInteger(0);
+      final AtomicBoolean channelLockedDuringFailover = new 
AtomicBoolean(true);
 
       ClientSession session = createSession(sf, true, true, 0);
 
-      backupServer.addInterceptor(
-         new Interceptor() {
-            private int index = 0;
-
-            @Override
-            public boolean intercept(Packet packet, RemotingConnection 
connection) throws ActiveMQException {
-               if (index < 1 && packet.getType() == PacketImpl.CREATESESSION) {
-                  sf.getConnection().addCloseListener(() -> {
-                     index++;
-                     ActiveMQSessionContext sessionContext = 
(ActiveMQSessionContext)((ClientSessionInternal)session).getSessionContext();
-                     
channelLockedDuringFailover.set(sessionContext.getSessionChannel().isLocked());
-                  });
-
-                  Channel sessionChannel = 
((RemotingConnectionImpl)connection).getChannel(ChannelImpl.CHANNEL_ID.SESSION.id,
 -1);
-                  sessionChannel.send(new ActiveMQExceptionMessage(new 
ActiveMQInternalErrorException()));
-                  return false;
-               }
-               return true;
+      backupServer.addInterceptor((packet, connection) -> {
+         if (packet.getType() == PacketImpl.CREATESESSION) {
+            if (reconnectRetries.getAndIncrement() < reconnectFailures) {
+               Channel sessionChannel = 
((RemotingConnectionImpl)connection).getChannel(ChannelImpl.CHANNEL_ID.SESSION.id,
 -1);
+               sessionChannel.send(new ActiveMQExceptionMessage(new 
ActiveMQInternalErrorException()));
+               return false;
             }
-         });
+
+            ActiveMQSessionContext sessionContext = 
(ActiveMQSessionContext)((ClientSessionInternal)session).getSessionContext();
+            channelLockedDuringFailover.compareAndSet(true, 
sessionContext.getSessionChannel().isLocked());
+         }
+         return true;
+      });
 
       session.start();
 
       crash(session);
 
       Assert.assertTrue(channelLockedDuringFailover.get());
+      Assert.assertEquals(reconnectFailures + 1, reconnectRetries.get());
    }
 
    @Test(timeout = 120000)
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
index 72c2f51f06..c96b0d9ed0 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/StaticClusterWithBackupFailoverTest.java
@@ -28,10 +28,28 @@ public class StaticClusterWithBackupFailoverTest extends 
ClusterWithBackupFailov
       return new int[]{0, 1, 2};
    }
 
+   protected boolean isLiveServerID(int id) {
+      for (int i : getLiveServerIDs()) {
+         if (i == id) {
+            return true;
+         }
+      }
+      return false;
+   }
+
    protected int[] getBackupServerIDs() {
       return new int[]{3, 4, 5};
    }
 
+   protected boolean isBackupServerID(int id) {
+      for (int i : getBackupServerIDs()) {
+         if (i == id) {
+            return true;
+         }
+      }
+      return false;
+   }
+
    @Override
    protected void setupCluster(final MessageLoadBalancingType 
messageLoadBalancingType) throws Exception {
       setupClusterConnectionWithBackups("cluster0", "queues", 
messageLoadBalancingType, 1, isNetty(), 0, new int[]{1, 2});

Reply via email to