This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit e4c5ea719fb531d23a8eccbf8aa01f55975e69c0
Author: Francesco Nigro <[email protected]>
AuthorDate: Wed Apr 8 16:56:14 2020 +0200

    ARTEMIS-2713 Added ReplicatedMultipleFailbackTest
---
 .../org/apache/activemq/artemis/utils/Wait.java    |   3 +
 .../cluster/qourum/SharedNothingBackupQuorum.java  | 148 +++++-----
 .../server/impl/SharedNothingBackupActivation.java |   2 +-
 tests/smoke-tests/pom.xml                          |  82 ++++++
 .../servers/replicated-failback-master1/broker.xml | 137 ++++++++++
 .../replicated-failback-master1/management.xml     |  20 ++
 .../servers/replicated-failback-master2/broker.xml | 138 ++++++++++
 .../replicated-failback-master2/management.xml     |  20 ++
 .../servers/replicated-failback-master3/broker.xml | 138 ++++++++++
 .../replicated-failback-master3/management.xml     |  20 ++
 .../servers/replicated-failback-slave1/broker.xml  | 137 ++++++++++
 .../replicated-failback-slave1/management.xml      |  20 ++
 .../artemis/tests/smoke/common/SmokeTestBase.java  |  19 +-
 .../ReplicatedMultipleFailbackTest.java            | 300 +++++++++++++++++++++
 14 files changed, 1115 insertions(+), 69 deletions(-)

diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
index 6cb011b..ab5826b 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
@@ -101,6 +101,9 @@ public class Wait {
       assertTrue(DEFAULT_FAILURE_MESSAGE, () -> !condition.isSatisfied(), 
duration, sleep);
    }
 
+   public static void assertTrue(Condition condition, final long duration) {
+      assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, SLEEP_MILLIS);
+   }
 
    public static void assertTrue(String failureMessage, Condition condition) {
       assertTrue(failureMessage, condition, MAX_WAIT_MILLIS);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
index 5e498af..9da89d0 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java
@@ -18,23 +18,23 @@ package 
org.apache.activemq.artemis.core.server.cluster.qourum;
 
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
 import 
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.Topology;
-import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
 import org.apache.activemq.artemis.core.server.NodeManager;
+import org.jboss.logging.Logger;
 
 public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener {
 
-   private TransportConfiguration liveTransportConfiguration;
+   private static final Logger LOGGER = 
Logger.getLogger(SharedNothingBackupQuorum.class);
 
    public enum BACKUP_ACTIVATION {
       FAIL_OVER, FAILURE_REPLICATING, ALREADY_REPLICATING, STOP;
@@ -46,7 +46,6 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
 
    private final NodeManager nodeManager;
 
-   private final StorageManager storageManager;
    private final ScheduledExecutorService scheduledPool;
    private final int quorumSize;
 
@@ -56,8 +55,6 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
 
    private final Object voteGuard = new Object();
 
-   private CountDownLatch latch;
-
    private ClientSessionFactoryInternal sessionFactory;
 
    private CoreRemotingConnection connection;
@@ -67,6 +64,14 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
    private volatile boolean stopped = false;
 
    private final int quorumVoteWait;
+
+   private volatile BACKUP_ACTIVATION signal;
+
+   private ScheduledFuture<?> decisionGuard;
+
+   private CountDownLatch latch;
+
+   private final Object onConnectionFailureGuard = new Object();
    /**
     * This is a safety net in case the live sends the first {@link 
ReplicationLiveIsStoppingMessage}
     * with code {@link 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#STOP_CALLED}
 and crashes before sending the second with
@@ -76,15 +81,13 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
     */
    public static final int WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG = 60;
 
-   public SharedNothingBackupQuorum(StorageManager storageManager,
-                                    NodeManager nodeManager,
+   public SharedNothingBackupQuorum(NodeManager nodeManager,
                                     ScheduledExecutorService scheduledPool,
                                     NetworkHealthCheck networkHealthCheck,
                                     int quorumSize,
                                     int voteRetries,
                                     long voteRetryWait,
                                     int quorumVoteWait) {
-      this.storageManager = storageManager;
       this.scheduledPool = scheduledPool;
       this.quorumSize = quorumSize;
       this.latch = new CountDownLatch(1);
@@ -95,29 +98,24 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
       this.quorumVoteWait = quorumVoteWait;
    }
 
-   private volatile BACKUP_ACTIVATION signal;
-
-   /**
-    * safety parameter to make _sure_ we get out of await()
-    */
-   private static final int LATCH_TIMEOUT = 30;
-
-   private final Object decisionGuard = new Object();
-
    @Override
    public String getName() {
       return "SharedNothingBackupQuorum";
    }
 
-   public void decideOnAction(Topology topology) {
-      //we may get called via multiple paths so need to guard
-      synchronized (decisionGuard) {
+   private void onConnectionFailure() {
+      //we may get called as sessionFactory or connection listener
+      synchronized (onConnectionFailureGuard) {
          if (signal == BACKUP_ACTIVATION.FAIL_OVER) {
+            LOGGER.debug("Replication connection failure with signal == 
FAIL_OVER: no need to take any action");
             if (networkHealthCheck != null && !networkHealthCheck.check()) {
                signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
             }
             return;
          }
+         //given that we're going to latch.countDown(), there is no need to 
await any
+         //scheduled task to complete
+         stopForcedFailoverAfterDelay();
          if (!isLiveDown()) {
             //lost connection but don't know if live is down so restart as 
backup as we can't replicate any more
             
ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults();
@@ -140,16 +138,13 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
                signal = BACKUP_ACTIVATION.FAILURE_REPLICATING;
             }
          }
+         latch.countDown();
       }
-      latch.countDown();
    }
 
    public void liveIDSet(String liveID) {
       targetServerID = liveID;
       nodeManager.setNodeID(liveID);
-      liveTransportConfiguration = 
quorumManager.getLiveTransportConfiguration(targetServerID);
-      //now we are replicating we can start waiting for disconnect 
notifications so we can fail over
-      // sessionFactory.addFailureListener(this);
    }
 
    @Override
@@ -166,14 +161,12 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
     */
    @Override
    public void nodeDown(Topology topology, long eventUID, String nodeID) {
-      if (targetServerID.equals(nodeID)) {
-         decideOnAction(topology);
-      }
+      //noop: we are NOT interested on topology info coming from connections 
!= this.connection
    }
 
    @Override
    public void nodeUp(Topology topology) {
-      //noop
+      //noop: we are NOT interested on topology info coming from connections 
!= this.connection
    }
 
    /**
@@ -181,7 +174,7 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
     */
    @Override
    public void connectionFailed(ActiveMQException exception, boolean 
failedOver) {
-      decideOnAction(sessionFactory.getServerLocator().getTopology());
+      onConnectionFailure();
    }
 
    @Override
@@ -204,10 +197,10 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
     */
    public void setSessionFactory(final ClientSessionFactoryInternal 
sessionFactory) {
       this.sessionFactory = sessionFactory;
-      this.connection = (CoreRemotingConnection) 
sessionFactory.getConnection();
-      connection.addFailureListener(this);
       //belts and braces, there are circumstances where the connection 
listener doesn't get called but the session does.
-      sessionFactory.addFailureListener(this);
+      this.sessionFactory.addFailureListener(this);
+      connection = (CoreRemotingConnection) sessionFactory.getConnection();
+      connection.addFailureListener(this);
    }
 
    /**
@@ -217,20 +210,19 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
     * backup that it should fail-over.
     */
    public synchronized void 
failOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) {
-      removeListener();
+      removeListeners();
       signal = BACKUP_ACTIVATION.FAIL_OVER;
-      if (finalMessage == 
ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER) {
-         latch.countDown();
-      }
-      if (finalMessage == 
ReplicationLiveIsStoppingMessage.LiveStopping.STOP_CALLED) {
-         final CountDownLatch localLatch = latch;
-         scheduledPool.schedule(new Runnable() {
-            @Override
-            public void run() {
-               localLatch.countDown();
-            }
-
-         }, WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG, TimeUnit.SECONDS);
+      switch (finalMessage) {
+
+         case STOP_CALLED:
+            scheduleForcedFailoverAfterDelay(latch);
+            break;
+         case FAIL_OVER:
+            stopForcedFailoverAfterDelay();
+            latch.countDown();
+            break;
+         default:
+            LOGGER.errorf("unsupported LiveStopping type: %s", finalMessage);
       }
    }
 
@@ -244,10 +236,13 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
       latch.countDown();
    }
 
-   private void removeListener() {
+   private void removeListeners() {
       if (connection != null) {
          connection.removeFailureListener(this);
       }
+      if (sessionFactory != null) {
+         sessionFactory.removeFailureListener(this);
+      }
    }
 
    /**
@@ -271,13 +266,38 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
     * @param explicitSignal the state we want to set the quorum manager to 
return
     */
    public synchronized void causeExit(BACKUP_ACTIVATION explicitSignal) {
+      stopForcedFailoverAfterDelay();
       stopped = true;
-      removeListener();
+      removeListeners();
       this.signal = explicitSignal;
       latch.countDown();
    }
 
+   private synchronized void scheduleForcedFailoverAfterDelay(CountDownLatch 
signalChanged) {
+      if (decisionGuard != null) {
+         if (decisionGuard.isDone()) {
+            LOGGER.warn("A completed force failover task wasn't cleaned-up: a 
new one will be scheduled");
+         } else if (!decisionGuard.cancel(false)) {
+            LOGGER.warn("Failed to cancel an existing uncompleted force 
failover task: a new one will be scheduled anyway");
+         } else {
+            LOGGER.warn("Cancelled an existing uncompleted force failover 
task: a new one will be scheduled in its place");
+         }
+      }
+      decisionGuard = scheduledPool.schedule(signalChanged::countDown,
+                                             
WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG, TimeUnit.SECONDS);
+   }
+
+   private synchronized boolean stopForcedFailoverAfterDelay() {
+      if (decisionGuard == null) {
+         return false;
+      }
+      final boolean stopped = decisionGuard.cancel(false);
+      decisionGuard = null;
+      return stopped;
+   }
+
    public synchronized void reset() {
+      stopForcedFailoverAfterDelay();
       latch = new CountDownLatch(1);
    }
 
@@ -287,13 +307,21 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
     * @return the voting decision
     */
    private boolean isLiveDown() {
-      //lets assume live is not down
-      Boolean decision = false;
-      int voteAttempts = 0;
-      int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : 
quorumSize;
+      // lets assume live is not down
+      if (stopped) {
+         return false;
+      }
+      final int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : 
quorumSize;
 
       synchronized (voteGuard) {
-         while (!stopped && voteAttempts++ < voteRetries) {
+         for (int voteAttempts = 0; voteAttempts < voteRetries && !stopped; 
voteAttempts++) {
+            if (voteAttempts > 0) {
+               try {
+                  voteGuard.wait(voteRetryWait);
+               } catch (InterruptedException e) {
+                  //nothing to do here
+               }
+            }
             //the live is dead so lets vote for quorum
             QuorumVoteServerConnect quorumVote = new 
QuorumVoteServerConnect(size, targetServerID);
 
@@ -308,19 +336,11 @@ public class SharedNothingBackupQuorum implements Quorum, 
SessionFailureListener
 
             quorumManager.voteComplete(quorumVote);
 
-            decision = quorumVote.getDecision();
-
-            if (decision) {
-               return decision;
-            }
-            try {
-               voteGuard.wait(voteRetryWait);
-            } catch (InterruptedException e) {
-               //nothing to do here
+            if (quorumVote.getDecision()) {
+               return true;
             }
          }
       }
-
-      return decision;
+      return false;
    }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
index 587b8f0..bbfc2b3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java
@@ -131,7 +131,7 @@ public final class SharedNothingBackupActivation extends 
Activation {
             logger.trace("Entered a synchronized");
             if (closed)
                return;
-            backupQuorum = new 
SharedNothingBackupQuorum(activeMQServer.getStorageManager(), 
activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), 
networkHealthCheck, replicaPolicy.getQuorumSize(), 
replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), 
replicaPolicy.getQuorumVoteWait());
+            backupQuorum = new 
SharedNothingBackupQuorum(activeMQServer.getNodeManager(), 
activeMQServer.getScheduledPool(), networkHealthCheck, 
replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), 
replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait());
             
activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum);
             
activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new 
ServerConnectVoteHandler(activeMQServer));
          }
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 2482a8d..1eb888c 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -307,6 +307,88 @@
                      </args>
                   </configuration>
                </execution>
+               <!-- START JmxReplicatedMultipleFailbackTest -->
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-replicated-failback-master1</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <!-- this makes it easier in certain envs -->
+                     
<configuration>${basedir}/target/classes/servers/replicated-failback-master1</configuration>
+                     <allowAnonymous>true</allowAnonymous>
+                     <user>admin</user>
+                     <password>admin</password>
+                     
<instance>${basedir}/target/replicated-failback-master1</instance>
+                     <args>
+                        <!-- this is needed to run the server remotely -->
+                        <arg>--java-options</arg>
+                        <arg>-Djava.rmi.server.hostname=localhost</arg>
+                     </args>
+                  </configuration>
+               </execution>
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-replicated-failback-master2</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <!-- this makes it easier in certain envs -->
+                     
<configuration>${basedir}/target/classes/servers/replicated-failback-master2</configuration>
+                     <allowAnonymous>true</allowAnonymous>
+                     <user>admin</user>
+                     <password>admin</password>
+                     
<instance>${basedir}/target/replicated-failback-master2</instance>
+                     <args>
+                        <!-- this is needed to run the server remotely -->
+                        <arg>--java-options</arg>
+                        <arg>-Djava.rmi.server.hostname=localhost</arg>
+                     </args>
+                  </configuration>
+               </execution>
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-replicated-failback-master3</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <!-- this makes it easier in certain envs -->
+                     
<configuration>${basedir}/target/classes/servers/replicated-failback-master3</configuration>
+                     <allowAnonymous>true</allowAnonymous>
+                     <user>admin</user>
+                     <password>admin</password>
+                     
<instance>${basedir}/target/replicated-failback-master3</instance>
+                     <args>
+                        <!-- this is needed to run the server remotely -->
+                        <arg>--java-options</arg>
+                        <arg>-Djava.rmi.server.hostname=localhost</arg>
+                     </args>
+                  </configuration>
+               </execution>
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-replicated-failback-slave1</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <!-- this makes it easier in certain envs -->
+                     
<configuration>${basedir}/target/classes/servers/replicated-failback-slave1</configuration>
+                     <allowAnonymous>true</allowAnonymous>
+                     <user>admin</user>
+                     <password>admin</password>
+                     
<instance>${basedir}/target/replicated-failback-slave1</instance>
+                     <args>
+                        <!-- this is needed to run the server remotely -->
+                        <arg>--java-options</arg>
+                        <arg>-Djava.rmi.server.hostname=localhost</arg>
+                     </args>
+                  </configuration>
+               </execution>
+               <!-- END JmxReplicatedMultipleFailbackTest -->
                <execution>
                   <phase>test-compile</phase>
                   <id>create-paging</id>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/replicated-failback-master1/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master1/broker.xml
new file mode 100644
index 0000000..656eac8
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master1/broker.xml
@@ -0,0 +1,137 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--><configuration xmlns="urn:activemq" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>master1</name>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      <large-messages-directory>./data/largemessages</large-messages-directory>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <ha-policy>
+         <replication>
+            <master>
+               <group-name>a</group-name>
+               <check-for-live-server>true</check-for-live-server>
+               <vote-on-replication-failure>true</vote-on-replication-failure>
+            </master>
+         </replication>
+      </ha-policy>
+
+      <connectors>
+         <!-- Connector used to be announced through cluster connections and 
notifications -->
+         <connector name="artemis">tcp://localhost:61616</connector>
+         <connector name="master2">tcp://localhost:61716</connector>
+         <connector name="master3">tcp://localhost:61816</connector>
+         <connector name="slave1">tcp://localhost:61916</connector>
+      </connectors>
+
+      <!-- Acceptors -->
+      <acceptors>
+         <acceptor name="artemis">tcp://localhost:61616</acceptor>
+      </acceptors>
+
+      <cluster-user>admin</cluster-user>
+
+      <cluster-password>password</cluster-password>
+
+      <cluster-connections>
+         <cluster-connection name="my-cluster">
+            <connector-ref>artemis</connector-ref>
+            <message-load-balancing>OFF</message-load-balancing>
+            <max-hops>1</max-hops>
+            <static-connectors>
+               <connector-ref>slave1</connector-ref>
+               <connector-ref>master2</connector-ref>
+               <connector-ref>master3</connector-ref>
+            </static-connectors>
+         </cluster-connection>
+      </cluster-connections>
+
+      <!-- Other config -->
+
+      <security-settings>
+         <!--security for example queue-->
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="amq, guest"/>
+            <permission type="deleteNonDurableQueue" roles="amq, guest"/>
+            <permission type="createDurableQueue" roles="amq, guest"/>
+            <permission type="deleteDurableQueue" roles="amq, guest"/>
+            <permission type="createAddress" roles="amq, guest"/>
+            <permission type="deleteAddress" roles="amq, guest"/>
+            <permission type="consume" roles="amq, guest"/>
+            <permission type="browse" roles="amq, guest"/>
+            <permission type="send" roles="amq, guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="amq"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>10MB</max-size-bytes>
+            <page-size-bytes>1MB</page-size-bytes>
+
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="exampleTopic">
+            <multicast>
+            </multicast>
+         </address>
+         <address name="exampleQueue">
+            <anycast>
+               <queue name="exampleQueue"/>
+            </anycast>
+         </address>
+      </addresses>
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/replicated-failback-master1/management.xml
 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master1/management.xml
new file mode 100644
index 0000000..576f1e5
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master1/management.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<management-context xmlns="http://activemq.org/schema";>
+   <connector connector-port="10099" connector-host="localhost"/>
+</management-context>
\ No newline at end of file
diff --git 
a/tests/smoke-tests/src/main/resources/servers/replicated-failback-master2/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master2/broker.xml
new file mode 100644
index 0000000..4a692ae
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master2/broker.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--><configuration xmlns="urn:activemq" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>master2</name>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      <large-messages-directory>./data/largemessages</large-messages-directory>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <ha-policy>
+         <replication>
+            <master>
+               <group-name>b</group-name>
+               <check-for-live-server>true</check-for-live-server>
+               <vote-on-replication-failure>true</vote-on-replication-failure>
+            </master>
+         </replication>
+      </ha-policy>
+
+      <connectors>
+         <!-- Connector used to be announced through cluster connections and 
notifications -->
+         <connector name="artemis">tcp://localhost:61716</connector>
+         <connector name="master1">tcp://localhost:61616</connector>
+         <connector name="master3">tcp://localhost:61816</connector>
+         <connector name="slave1">tcp://localhost:61816</connector>
+      </connectors>
+
+
+      <!-- Acceptors -->
+      <acceptors>
+         <acceptor name="artemis">tcp://localhost:61716</acceptor>
+      </acceptors>
+
+      <cluster-user>admin</cluster-user>
+
+      <cluster-password>password</cluster-password>
+
+      <cluster-connections>
+         <cluster-connection name="my-cluster">
+            <connector-ref>artemis</connector-ref>
+            <message-load-balancing>OFF</message-load-balancing>
+            <max-hops>1</max-hops>
+            <static-connectors>
+               <connector-ref>master3</connector-ref>
+               <connector-ref>master1</connector-ref>
+               <connector-ref>slave1</connector-ref>
+            </static-connectors>
+         </cluster-connection>
+      </cluster-connections>
+
+      <!-- Other config -->
+
+      <security-settings>
+         <!--security for example queue-->
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="amq, guest"/>
+            <permission type="deleteNonDurableQueue" roles="amq, guest"/>
+            <permission type="createDurableQueue" roles="amq, guest"/>
+            <permission type="deleteDurableQueue" roles="amq, guest"/>
+            <permission type="createAddress" roles="amq, guest"/>
+            <permission type="deleteAddress" roles="amq, guest"/>
+            <permission type="consume" roles="amq, guest"/>
+            <permission type="browse" roles="amq, guest"/>
+            <permission type="send" roles="amq, guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="amq"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>10MB</max-size-bytes>
+            <page-size-bytes>1MB</page-size-bytes>
+
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="exampleTopic">
+            <multicast>
+            </multicast>
+         </address>
+         <address name="exampleQueue">
+            <anycast>
+               <queue name="exampleQueue"/>
+            </anycast>
+         </address>
+      </addresses>
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/replicated-failback-master2/management.xml
 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master2/management.xml
new file mode 100644
index 0000000..14bbaf2
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master2/management.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<management-context xmlns="http://activemq.org/schema";>
+   <connector connector-port="10199" connector-host="localhost"/>
+</management-context>
\ No newline at end of file
diff --git 
a/tests/smoke-tests/src/main/resources/servers/replicated-failback-master3/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master3/broker.xml
new file mode 100644
index 0000000..c9852ab
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master3/broker.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--><configuration xmlns="urn:activemq" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>master3</name>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      <large-messages-directory>./data/largemessages</large-messages-directory>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <ha-policy>
+         <replication>
+            <master>
+               <group-name>c</group-name>
+               <check-for-live-server>true</check-for-live-server>
+               <vote-on-replication-failure>true</vote-on-replication-failure>
+            </master>
+         </replication>
+      </ha-policy>
+
+      <connectors>
+         <!-- Connector used to be announced through cluster connections and 
notifications -->
+         <connector name="artemis">tcp://localhost:61816</connector>
+         <connector name="master1">tcp://localhost:61616</connector>
+         <connector name="master2">tcp://localhost:61716</connector>
+         <connector name="slave1">tcp://localhost:61916</connector>
+      </connectors>
+
+
+      <!-- Acceptors -->
+      <acceptors>
+         <acceptor name="artemis">tcp://localhost:61816</acceptor>
+      </acceptors>
+
+      <cluster-user>admin</cluster-user>
+
+      <cluster-password>password</cluster-password>
+
+      <cluster-connections>
+         <cluster-connection name="my-cluster">
+            <connector-ref>artemis</connector-ref>
+            <message-load-balancing>OFF</message-load-balancing>
+            <max-hops>1</max-hops>
+            <static-connectors>
+               <connector-ref>master2</connector-ref>
+               <connector-ref>master1</connector-ref>
+               <connector-ref>slave1</connector-ref>
+            </static-connectors>
+         </cluster-connection>
+      </cluster-connections>
+
+      <!-- Other config -->
+
+      <security-settings>
+         <!--security for example queue-->
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="amq, guest"/>
+            <permission type="deleteNonDurableQueue" roles="amq, guest"/>
+            <permission type="createDurableQueue" roles="amq, guest"/>
+            <permission type="deleteDurableQueue" roles="amq, guest"/>
+            <permission type="createAddress" roles="amq, guest"/>
+            <permission type="deleteAddress" roles="amq, guest"/>
+            <permission type="consume" roles="amq, guest"/>
+            <permission type="browse" roles="amq, guest"/>
+            <permission type="send" roles="amq, guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="amq"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>10MB</max-size-bytes>
+            <page-size-bytes>1MB</page-size-bytes>
+
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="exampleTopic">
+            <multicast>
+            </multicast>
+         </address>
+         <address name="exampleQueue">
+            <anycast>
+               <queue name="exampleQueue"/>
+            </anycast>
+         </address>
+      </addresses>
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/replicated-failback-master3/management.xml
 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master3/management.xml
new file mode 100644
index 0000000..9de3c59
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master3/management.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<management-context xmlns="http://activemq.org/schema";>
+   <connector connector-port="10299" connector-host="localhost"/>
+</management-context>
\ No newline at end of file
diff --git 
a/tests/smoke-tests/src/main/resources/servers/replicated-failback-slave1/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-slave1/broker.xml
new file mode 100644
index 0000000..601f3fd
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-slave1/broker.xml
@@ -0,0 +1,137 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--><configuration xmlns="urn:activemq" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>slave1</name>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      <large-messages-directory>./data/largemessages</large-messages-directory>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <ha-policy>
+         <replication>
+            <slave>
+               <group-name>a</group-name>
+               <allow-failback>true</allow-failback>
+            </slave>
+         </replication>
+      </ha-policy>
+
+      <connectors>
+         <!-- Connector used to be announced through cluster connections and 
notifications -->
+         <connector name="artemis">tcp://localhost:61916</connector>
+         <connector name="master1">tcp://localhost:61616</connector>
+         <connector name="master2">tcp://localhost:61716</connector>
+         <connector name="master3">tcp://localhost:61816</connector>
+      </connectors>
+
+
+      <!-- Acceptors -->
+      <acceptors>
+         <acceptor name="artemis">tcp://localhost:61916</acceptor>
+      </acceptors>
+
+      <cluster-user>admin</cluster-user>
+
+      <cluster-password>password</cluster-password>
+
+      <cluster-connections>
+         <cluster-connection name="my-cluster">
+            <connector-ref>artemis</connector-ref>
+            <message-load-balancing>OFF</message-load-balancing>
+            <max-hops>1</max-hops>
+            <static-connectors>
+               <connector-ref>master1</connector-ref>
+               <connector-ref>master2</connector-ref>
+               <connector-ref>master3</connector-ref>
+            </static-connectors>
+         </cluster-connection>
+      </cluster-connections>
+
+      <!-- Other config -->
+
+      <security-settings>
+         <!--security for example queue-->
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="amq, guest"/>
+            <permission type="deleteNonDurableQueue" roles="amq, guest"/>
+            <permission type="createDurableQueue" roles="amq, guest"/>
+            <permission type="deleteDurableQueue" roles="amq, guest"/>
+            <permission type="createAddress" roles="amq, guest"/>
+            <permission type="deleteAddress" roles="amq, guest"/>
+            <permission type="consume" roles="amq, guest"/>
+            <permission type="browse" roles="amq, guest"/>
+            <permission type="send" roles="amq, guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="amq"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>10MB</max-size-bytes>
+            <page-size-bytes>1MB</page-size-bytes>
+
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="exampleTopic">
+            <multicast>
+            </multicast>
+         </address>
+         <address name="exampleQueue">
+            <anycast>
+               <queue name="exampleQueue"/>
+            </anycast>
+         </address>
+      </addresses>
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/replicated-failback-slave1/management.xml
 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-slave1/management.xml
new file mode 100644
index 0000000..f9ae2f9
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/replicated-failback-slave1/management.xml
@@ -0,0 +1,20 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<management-context xmlns="http://activemq.org/schema";>
+   <connector connector-port="10399" connector-host="localhost"/>
+</management-context>
\ No newline at end of file
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java
index e6c2a6e..e35ffa5 100644
--- 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java
@@ -18,14 +18,15 @@
 package org.apache.activemq.artemis.tests.smoke.common;
 
 import java.io.File;
-import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.util.ServerUtil;
 import org.junit.After;
 
 public class SmokeTestBase extends ActiveMQTestBase {
-   ArrayList<Process> processes = new ArrayList();
+   Set<Process> processes = new HashSet<>();
 
    public static final String basedir = System.getProperty("basedir");
 
@@ -38,13 +39,23 @@ public class SmokeTestBase extends ActiveMQTestBase {
             e.printStackTrace();
          }
       }
+      processes.clear();
    }
 
-   public String getServerLocation(String serverName) {
+   public void killServer(Process process) {
+      processes.remove(process);
+      try {
+         ServerUtil.killServer(process);
+      } catch (Throwable e) {
+         e.printStackTrace();
+      }
+   }
+
+   public static String getServerLocation(String serverName) {
       return basedir + "/target/" + serverName;
    }
 
-   public void cleanupData(String serverName) {
+   public static void cleanupData(String serverName) {
       String location = getServerLocation(serverName);
       deleteDirectory(new File(location, "data"));
    }
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmxmultiplefailback/ReplicatedMultipleFailbackTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmxmultiplefailback/ReplicatedMultipleFailbackTest.java
new file mode 100644
index 0000000..f354fe6
--- /dev/null
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmxmultiplefailback/ReplicatedMultipleFailbackTest.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.jmxmultiplefailback;
+
+import javax.json.Json;
+import javax.json.JsonArray;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import java.io.StringReader;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReplicatedMultipleFailbackTest extends SmokeTestBase {
+
+   private static final Logger LOGGER = 
Logger.getLogger(ReplicatedMultipleFailbackTest.class);
+
+   @FunctionalInterface
+   interface ThrowableFunction<T, R> {
+
+      R apply(T t) throws Throwable;
+   }
+
+   private static <C, T> Optional<T> queryControl(JMXServiceURL serviceURI,
+                                                  ObjectName objectName,
+                                                  ThrowableFunction<C, T> 
queryControl,
+                                                  Class<C> controlClass,
+                                                  Function<Throwable, T> 
onThrowable) {
+      try {
+         try (JMXConnector jmx = JMXConnectorFactory.connect(serviceURI)) {
+            final C control = 
MBeanServerInvocationHandler.newProxyInstance(jmx.getMBeanServerConnection(), 
objectName, controlClass, false);
+            return Optional.ofNullable(queryControl.apply(control));
+         }
+      } catch (Throwable t) {
+         return Optional.ofNullable(onThrowable.apply(t));
+      }
+   }
+
+   private static Optional<Boolean> isBackup(JMXServiceURL serviceURI, 
ObjectNameBuilder builder) throws Exception {
+      return queryControl(serviceURI, builder.getActiveMQServerObjectName(), 
ActiveMQServerControl::isBackup, ActiveMQServerControl.class, throwable -> 
null);
+   }
+
+   private static Optional<String> getNodeID(JMXServiceURL serviceURI, 
ObjectNameBuilder builder) throws Exception {
+      return queryControl(serviceURI, builder.getActiveMQServerObjectName(), 
ActiveMQServerControl::getNodeID, ActiveMQServerControl.class, throwable -> 
null);
+   }
+
+   private static Optional<String> listNetworkTopology(JMXServiceURL 
serviceURI,
+                                                       ObjectNameBuilder 
builder) throws Exception {
+      return queryControl(serviceURI, builder.getActiveMQServerObjectName(), 
ActiveMQServerControl::listNetworkTopology, ActiveMQServerControl.class, 
throwable -> null);
+   }
+
+   private static Map<String, Pair<String, String>> 
decodeNetworkTopologyJson(String networkTopologyJson) {
+      if (networkTopologyJson == null || networkTopologyJson.isEmpty()) {
+         return Collections.emptyMap();
+      }
+      try (JsonReader jsonReader = Json.createReader(new 
StringReader(networkTopologyJson))) {
+         final JsonArray nodeIDs = jsonReader.readArray();
+         final int nodeCount = nodeIDs.size();
+         Map<String, Pair<String, String>> networkTopology = new 
HashMap<>(nodeCount);
+         for (int i = 0; i < nodeCount; i++) {
+            final JsonObject nodePair = nodeIDs.getJsonObject(i);
+            final String nodeID = nodePair.getString("nodeID");
+            final String live = nodePair.getString("live");
+            final String backup = nodePair.getString("backup", null);
+            networkTopology.put(nodeID, new Pair<>(live, backup));
+         }
+         return networkTopology;
+      }
+   }
+
+   private static long countMembers(Map<String, Pair<String, String>> 
networkTopology) {
+      final long count = networkTopology.values().stream()
+         .map(Pair::getA).filter(live -> live != null && !live.isEmpty())
+         .count();
+      return count;
+   }
+
+   private static long countNodes(Map<String, Pair<String, String>> 
networkTopology) {
+      final long count =  networkTopology.values().stream()
+         .flatMap(pair -> Stream.of(pair.getA(), pair.getB()))
+         .filter(liveOrBackup -> liveOrBackup != null && 
!liveOrBackup.isEmpty())
+         .count();
+      return count;
+   }
+
+   private static boolean validateNetworkTopology(String 
networkTopologyJson,Predicate<Map<String, Pair<String, String>>> checkTopology) 
{
+      final Map<String, Pair<String, String>> networkTopology = 
decodeNetworkTopologyJson(networkTopologyJson);
+      return checkTopology.test(networkTopology);
+   }
+
+   private static String backupOf(String nodeID, Map<String, Pair<String, 
String>> networkTopology) {
+      return networkTopology.get(nodeID).getB();
+   }
+
+   private static String liveOf(String nodeID, Map<String, Pair<String, 
String>> networkTopology) {
+      return networkTopology.get(nodeID).getA();
+   }
+
+   private static Predicate<Map<String, Pair<String, String>>> 
containsExactNodeIds(String... nodeID) {
+      Objects.requireNonNull(nodeID);
+      return topology -> topology.size() == nodeID.length && 
Stream.of(nodeID).allMatch(topology::containsKey);
+   }
+
+   private static Predicate<Map<String, Pair<String, String>>> withMembers(int 
count) {
+      return topology -> countMembers(topology) == count;
+   }
+
+   private static Predicate<Map<String, Pair<String, String>>> withNodes(int 
count) {
+      return topology -> countNodes(topology) == count;
+   }
+
+   private static Predicate<Map<String, Pair<String, String>>> 
withBackup(String nodeId, Predicate<String> compare) {
+      return topology -> compare.test(backupOf(nodeId, topology));
+   }
+
+   private static Predicate<Map<String, Pair<String, String>>> withLive(String 
nodeId, Predicate<String> compare) {
+      return topology -> compare.test(liveOf(nodeId, topology));
+   }
+
+   private static final String JMX_SERVER_HOSTNAME = "localhost";
+   private static final int JMX_PORT_MASTER_1 = 10099;
+   private static final int JMX_PORT_MASTER_2 = 10199;
+   private static final int JMX_PORT_MASTER_3 = 10299;
+   private static final int JMX_PORT_SLAVE_1 = 10399;
+
+   private static final String MASTER_1_DATA_FOLDER = 
"replicated-failback-master1";
+   private static final String MASTER_2_DATA_FOLDER = 
"replicated-failback-master2";
+   private static final String MASTER_3_DATA_FOLDER = 
"replicated-failback-master3";
+   private static final String SLAVE_1_DATA_FOLDER = 
"replicated-failback-slave1";
+
+   private static final int MASTER_1_PORT_ID = 0;
+   private static final int MASTER_2_PORT_ID = MASTER_1_PORT_ID + 100;
+   private static final int MASTER_3_PORT_ID = MASTER_2_PORT_ID + 100;
+   private static final int SLAVE_1_PORT_ID = MASTER_3_PORT_ID + 100;
+
+   private enum Broker {
+      master1(JMX_PORT_MASTER_1, MASTER_1_DATA_FOLDER, MASTER_1_PORT_ID), 
master2(JMX_PORT_MASTER_2, MASTER_2_DATA_FOLDER, MASTER_2_PORT_ID), 
master3(JMX_PORT_MASTER_3, MASTER_3_DATA_FOLDER, MASTER_3_PORT_ID), 
slave1(JMX_PORT_SLAVE_1, SLAVE_1_DATA_FOLDER, SLAVE_1_PORT_ID);
+
+      final ObjectNameBuilder objectNameBuilder;
+      final String dataFolder;
+      final JMXServiceURL jmxServiceURL;
+      final int portID;
+
+      Broker(int jmxPort, String dataFolder, int portID) {
+         this.portID = portID;
+         this.dataFolder = dataFolder;
+         try {
+            jmxServiceURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" 
+ JMX_SERVER_HOSTNAME + ":" + jmxPort + "/jmxrmi");
+         } catch (MalformedURLException e) {
+            throw new RuntimeException(e);
+         }
+         this.objectNameBuilder = 
ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), 
name(), true);
+      }
+
+      public Process startServer(SmokeTestBase env, int millisTimeout) throws 
Exception {
+         return env.startServer(dataFolder, portID, millisTimeout);
+      }
+
+      public void cleanupData() {
+         ReplicatedMultipleFailbackTest.cleanupData(dataFolder);
+      }
+
+      public Optional<Boolean> isBackup() throws Exception {
+         return ReplicatedMultipleFailbackTest.isBackup(jmxServiceURL, 
objectNameBuilder);
+      }
+
+      public Optional<String> getNodeID() throws Exception {
+         return ReplicatedMultipleFailbackTest.getNodeID(jmxServiceURL, 
objectNameBuilder);
+      }
+
+      public Optional<String> listNetworkTopology() throws Exception {
+         return 
ReplicatedMultipleFailbackTest.listNetworkTopology(jmxServiceURL, 
objectNameBuilder);
+      }
+   }
+
+   @Before
+   public void before() {
+      Stream.of(Broker.values()).forEach(Broker::cleanupData);
+      disableCheckThread();
+   }
+
+   @Test
+   public void testMultipleFailback() throws Exception {
+      LOGGER.infof("TEST BOOTSTRAPPING START: STARTING brokers %s", 
Arrays.toString(Broker.values()));
+      final int failbackRetries = 10;
+      final int timeout = (int) TimeUnit.SECONDS.toMillis(30);
+      Process master1 = Broker.master1.startServer(this, timeout);
+      Wait.assertTrue(() -> !Broker.master1.isBackup().orElse(true), timeout);
+      Broker.master2.startServer(this, timeout);
+      Wait.assertTrue(() -> !Broker.master2.isBackup().orElse(true), timeout);
+      Broker.master3.startServer(this, timeout);
+      Wait.assertTrue(() -> !Broker.master3.isBackup().orElse(true), timeout);
+      Broker.slave1.startServer(this, 0);
+      Wait.assertTrue(() -> Broker.slave1.isBackup().orElse(false), timeout);
+
+      final String nodeIDlive1 = Broker.master1.getNodeID().get();
+      final String nodeIDlive2 = Broker.master2.getNodeID().get();
+      final String nodeIDlive3 = Broker.master3.getNodeID().get();
+
+      for (Broker broker : Broker.values()) {
+         LOGGER.infof("CHECKING NETWORK TOPOLOGY FOR %s", broker);
+         Wait.assertTrue(() -> 
validateNetworkTopology(broker.listNetworkTopology().orElse(""),
+                                                       
containsExactNodeIds(nodeIDlive1, nodeIDlive2, nodeIDlive3)
+                                                          
.and(withLive(nodeIDlive1, Objects::nonNull))
+                                                          
.and(withBackup(nodeIDlive1, Objects::nonNull))
+                                                          .and(withMembers(3))
+                                                          .and(withNodes(4))), 
timeout);
+      }
+
+      final String urlSlave1 = backupOf(nodeIDlive1, 
decodeNetworkTopologyJson(Broker.slave1.listNetworkTopology().get()));
+      Assert.assertNotNull(urlSlave1);
+      final String urlMaster1 = liveOf(nodeIDlive1, 
decodeNetworkTopologyJson(Broker.master1.listNetworkTopology().get()));
+      Assert.assertNotNull(urlMaster1);
+      Assert.assertNotEquals(urlMaster1, urlSlave1);
+
+      LOGGER.infof("Node ID live 1 is %s", nodeIDlive1);
+      LOGGER.infof("Node ID live 2 is %s", nodeIDlive2);
+      LOGGER.infof("Node ID live 3 is %s", nodeIDlive3);
+
+      LOGGER.infof("%s has url: %s", Broker.master1, urlMaster1);
+      LOGGER.infof("%s has url: %s", Broker.slave1, urlSlave1);
+
+      LOGGER.info("BOOTSTRAPPING ENDED: READ nodeIds and master1/slave1 urls");
+
+      for (int i = 0; i < failbackRetries; i++) {
+         LOGGER.infof("START TEST %d", i + 1);
+         LOGGER.infof("KILLING master1");
+         killServer(master1);
+         // wait until slave1 became live
+         Wait.assertTrue(() -> !Broker.slave1.isBackup().orElse(true), 
timeout);
+         LOGGER.info("slave1 is LIVE");
+         LOGGER.info("VALIDATE TOPOLOGY OF ALIVE BROKERS");
+         Stream.of(Broker.master2, Broker.master3, Broker.slave1).forEach(
+            broker -> Wait.assertTrue(() -> 
validateNetworkTopology(broker.listNetworkTopology().orElse(""),
+                                                                    
containsExactNodeIds(nodeIDlive1, nodeIDlive2, nodeIDlive3)
+                                                                       
.and(withLive(nodeIDlive1, urlSlave1::equals))
+                                                                       
.and(withBackup(nodeIDlive1, Objects::isNull))
+                                                                       
.and(withMembers(3))
+                                                                       
.and(withNodes(3))), timeout)
+         );
+         // restart master1
+         LOGGER.info("STARTING master1");
+         master1 = Broker.master1.startServer(this, 0);
+         Wait.assertTrue(() -> Broker.slave1.isBackup().orElse(false), 
timeout);
+         LOGGER.info("slave1 is BACKUP");
+         Wait.assertTrue(() -> !Broker.master1.isBackup().orElse(true), 
timeout);
+         LOGGER.info("master1 is LIVE");
+         for (Broker broker : Broker.values()) {
+            LOGGER.infof("CHECKING NETWORK TOPOLOGY FOR %s", broker);
+            Wait.assertTrue(() -> 
validateNetworkTopology(broker.listNetworkTopology().orElse(""),
+                                                          
containsExactNodeIds(nodeIDlive1, nodeIDlive2, nodeIDlive3)
+                                                             
.and(withLive(nodeIDlive1, urlMaster1::equals))
+                                                             
.and(withBackup(nodeIDlive1, urlSlave1::equals))
+                                                             
.and(withMembers(3))
+                                                             
.and(withNodes(4))), timeout);
+         }
+      }
+      LOGGER.info("TEST COMPLETED");
+   }
+}
+

Reply via email to