This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 007489597b ARTEMIS-5925 Support Star Mirror Configuration on Lock 
Coordinator
007489597b is described below

commit 007489597b5ac9fb7477b5d29edec8d4184f1a92
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Mar 2 10:02:16 2026 -0500

    ARTEMIS-5925 Support Star Mirror Configuration on Lock Coordinator
    
    Disclaimer: The test StarMirrorSingleAcceptorRunningTest was written with 
the help
    from Claude agent. I basically asked Claude to copy it from
    DualMirrorSingleAcceptorRunning with the additional configuration
    options.
---
 .../amqp/connect/AMQPBrokerConnection.java         |  98 +++++-
 .../amqp/connect/AMQPBrokerConnectionManager.java  |   8 +
 .../connect/mirror/AMQPMirrorControllerSource.java |  21 ++
 .../BrokerConnectConfiguration.java                |  10 +
 .../deployers/impl/FileConfigurationParser.java    |   3 +-
 .../core/remoting/impl/netty/NettyAcceptor.java    |   6 +-
 .../artemis/core/server/BrokerConnection.java      |   5 +
 .../artemis/core/server/lock/LockCoordinator.java  |  57 +++-
 .../resources/schema/artemis-configuration.xsd     |   7 +
 .../config/impl/FileConfigurationParserTest.java   |  11 +-
 docs/user-manual/lock-coordination.adoc            |  96 ++++--
 .../management/ActiveMQServerControlTest.java      |  11 +
 .../starMirrorSingleAcceptor/ZK/A/broker.xml       | 181 +++++++++++
 .../starMirrorSingleAcceptor/ZK/B/broker.xml       | 181 +++++++++++
 .../starMirrorSingleAcceptor/ZK/C/broker.xml       | 181 +++++++++++
 .../starMirrorSingleAcceptor/file/A/broker.xml     | 182 +++++++++++
 .../starMirrorSingleAcceptor/file/B/broker.xml     | 182 +++++++++++
 .../starMirrorSingleAcceptor/file/C/broker.xml     | 182 +++++++++++
 .../smoke/lockmanager/LockCoordinatorTest.java     |  36 +++
 .../StarMirrorSingleAcceptorRunningTest.java       | 349 +++++++++++++++++++++
 20 files changed, 1758 insertions(+), 49 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 1bb6b4ed8d..54452d4dc4 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -62,6 +62,7 @@ import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
 import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@@ -161,6 +162,38 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
    private final Set<Queue> senders = new HashSet<>();
    private final Set<Queue> receivers = new HashSet<>();
    private final Map<String, Predicate<Link>> linkClosedInterceptors = new 
ConcurrentHashMap<>();
+   private LockCoordinator lockCoordinator;
+
+   /**
+    * This will be false if the lock coordinator is in place and it is not 
holding the lock
+    */
+   private volatile boolean enabled = true;
+
+   /**
+    * This will return false when the LockCoordinator lose the lock.
+    *  */
+   public boolean isEnabled() {
+      return enabled;
+   }
+
+   private void enable() {
+      enabled = true;
+   }
+
+   private void disable() {
+      enabled = false;
+   }
+
+   @Override
+   public LockCoordinator getLockCoordinator() {
+      return lockCoordinator;
+   }
+
+   @Override
+   public AMQPBrokerConnection setLockCoordinator(LockCoordinator 
lockCoordinator) {
+      this.lockCoordinator = lockCoordinator;
+      return this;
+   }
 
    final Executor connectExecutor;
    final ScheduledExecutorService scheduledExecutorService;
@@ -252,6 +285,19 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
 
    @Override
    public synchronized void start() throws Exception {
+
+      if (lockCoordinator != null) {
+         disable();
+         lockCoordinator.onLockAcquired(this::resume);
+         lockCoordinator.onLockReleased(this::pause);
+      } else {
+         enable();
+      }
+
+      this.internalStart();
+   }
+
+   private synchronized void internalStart() throws Exception {
       if (!started) {
          started = true;
          server.getConfiguration().registerBrokerPlugin(this);
@@ -273,19 +319,21 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
             logger.warn(e.getMessage(), e);
          }
 
-         if (brokerFederation != null) {
-            try {
-               brokerFederation.start();
-            } catch (ActiveMQException e) {
-               logger.warn("Error caught while starting federation instance.", 
e);
+         if (isEnabled()) {
+            if (brokerFederation != null) {
+               try {
+                  brokerFederation.start();
+               } catch (ActiveMQException e) {
+                  logger.warn("Error caught while starting federation 
instance.", e);
+               }
             }
-         }
 
-         if (bridgeManagers != null) {
-            try {
-               bridgeManagers.start();
-            } catch (ActiveMQException e) {
-               logger.warn("Error caught while starting bridge managers 
instance.", e);
+            if (bridgeManagers != null) {
+               try {
+                  bridgeManagers.start();
+               } catch (ActiveMQException e) {
+                  logger.warn("Error caught while starting bridge managers 
instance.", e);
+               }
             }
          }
 
@@ -295,6 +343,12 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
 
    @Override
    public synchronized void stop() {
+      // NOTE: We don't set enabled=false in the stop for the following reason:
+      //       Mirror is supposed to keep capturing events as the broker is 
being shutdown or
+      //       the connection is being stopped, or even after stopped.
+      //       Say as the broker is going down a message is received and 
recorded
+      //       if the mirror stopped capturing that event, you would miss it 
for next time you restart the broker
+      //       and our tests would eventually miss an event, or more 
importantly users would lose a message during this process
       if (started) {
          started = false;
          server.getConfiguration().unRegisterBrokerPlugin(this);
@@ -359,6 +413,28 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
       }
    }
 
+   private synchronized void pause() throws Exception {
+      disable();
+      if (bridgeManagers != null) {
+         bridgeManagers.stop();
+      }
+      if (brokerFederation != null) {
+         brokerFederation.stop();
+      }
+   }
+
+
+   private synchronized void resume() throws Exception {
+      enable();
+      if (bridgeManagers != null) {
+         bridgeManagers.start();
+      }
+      if (brokerFederation != null) {
+         brokerFederation.start();
+      }
+   }
+
+
    public ActiveMQServer getServer() {
       return server;
    }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
index 644e07ecc9..95a2284e38 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
@@ -36,6 +36,7 @@ import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
 import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
 import 
org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -89,6 +90,13 @@ public class AMQPBrokerConnectionManager implements 
ActiveMQComponent, ClientCon
    private void createBrokerConnection(AMQPBrokerConnectConfiguration 
configuration, boolean start) throws Exception {
       AMQPBrokerConnection amqpBrokerConnection = new 
AMQPBrokerConnection(this, configuration, protonProtocolManagerFactory, server);
       amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection);
+      if (configuration.getLockCoordinator() != null) {
+         LockCoordinator lockCoordinator = 
server.getLockCoordinator(configuration.getLockCoordinator());
+         if (lockCoordinator == null) {
+            throw new IllegalStateException("lock coordinator " + 
configuration.getName() + " not found");
+         }
+         amqpBrokerConnection.setLockCoordinator(lockCoordinator);
+      }
 
       amqpBrokerConnection.initialize();
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index fae26f0293..9bd23b5b9c 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -204,6 +204,9 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
 
    @Override
    public void addAddress(AddressInfo addressInfo) throws Exception {
+      if (!brokerConnection.isEnabled()) {
+         return;
+      }
       logger.trace("{} addAddress {}", server, addressInfo);
 
       if (getControllerInUse() != null && !addressInfo.isInternal()) {
@@ -230,6 +233,9 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
 
    @Override
    public void deleteAddress(AddressInfo addressInfo) throws Exception {
+      if (!brokerConnection.isEnabled()) {
+         return;
+      }
       logger.trace("{} deleteAddress {}", server, addressInfo);
 
       if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
@@ -246,6 +252,9 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
 
    @Override
    public void createQueue(QueueConfiguration queueConfiguration) throws 
Exception {
+      if (!brokerConnection.isEnabled()) {
+         return;
+      }
       logger.trace("{} createQueue {}", server, queueConfiguration);
 
       if (invalidTarget(getControllerInUse()) || 
queueConfiguration.isInternal()) {
@@ -274,6 +283,9 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
 
    @Override
    public void deleteQueue(SimpleString address, SimpleString queue) throws 
Exception {
+      if (!brokerConnection.isEnabled()) {
+         return;
+      }
       if (logger.isTraceEnabled()) {
          logger.trace("{} deleteQueue {}/{}", server, address, queue);
       }
@@ -338,6 +350,9 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
 
    @Override
    public void sendMessage(Transaction tx, Message message, RoutingContext 
context) {
+      if (!brokerConnection.isEnabled()) {
+         return;
+      }
       SimpleString address = context.getAddress(message);
 
       if (context.isInternal()) {
@@ -537,6 +552,9 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
 
    @Override
    public void postAcknowledge(MessageReference ref, AckReason reason) throws 
Exception {
+      if (!brokerConnection.isEnabled()) {
+         return;
+      }
       if (!acks || ref.getQueue().isMirrorController()) {
          postACKInternalMessage(ref);
          return;
@@ -546,6 +564,9 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
 
    @Override
    public void preAcknowledge(final Transaction tx, final MessageReference 
ref, final AckReason reason) throws Exception {
+      if (!brokerConnection.isEnabled()) {
+         return;
+      }
       if (logger.isTraceEnabled()) {
          logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref, 
reason);
       }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/brokerConnectivity/BrokerConnectConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/brokerConnectivity/BrokerConnectConfiguration.java
index 38c5f9865e..ca071c9e1e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/brokerConnectivity/BrokerConnectConfiguration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/brokerConnectivity/BrokerConnectConfiguration.java
@@ -29,6 +29,7 @@ public abstract class BrokerConnectConfiguration implements 
Serializable {
 
    private static final long serialVersionUID = 8026604526022462048L;
 
+   private String lockCoordinator;
    private String name;
    private String uri;
    private String user;
@@ -42,6 +43,15 @@ public abstract class BrokerConnectConfiguration implements 
Serializable {
       this.uri = uri;
    }
 
+   public String getLockCoordinator() {
+      return lockCoordinator;
+   }
+
+   public BrokerConnectConfiguration setLockCoordinator(String 
lockCoordinator) {
+      this.lockCoordinator = lockCoordinator;
+      return this;
+   }
+
    public abstract void parseURI() throws Exception;
 
    public int getReconnectAttempts() {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index c75e2dc1e6..3e1963b4ed 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -2219,6 +2219,7 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       String uri = e.getAttribute("uri");
 
+      String lockCoordinator = getAttributeValue(e, "lock-coordinator");
       int retryInterval = getAttributeInteger(e, "retry-interval", 5000, 
GT_ZERO);
       int reconnectAttempts = getAttributeInteger(e, "reconnect-attempts", -1, 
MINUS_ONE_OR_GT_ZERO);
       String user = getAttributeValue(e, "user");
@@ -2235,7 +2236,7 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
       AMQPBrokerConnectConfiguration config = new 
AMQPBrokerConnectConfiguration(name, uri);
       config.parseURI();
-      
config.setRetryInterval(retryInterval).setReconnectAttempts(reconnectAttempts).setUser(user).setPassword(password).setAutostart(autoStart);
+      
config.setRetryInterval(retryInterval).setReconnectAttempts(reconnectAttempts).setUser(user).setPassword(password).setAutostart(autoStart).setLockCoordinator(lockCoordinator);
 
       mainConfig.addAMQPConnection(config);
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index b473aca7c2..e4bc1ae8aa 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -468,8 +468,10 @@ public class NettyAcceptor extends AbstractAcceptor {
       if (lockCoordinator == null) {
          internalStart();
       } else {
-         lockCoordinator.onLockAcquired(this::internalStart);
-         lockCoordinator.onLockReleased(this::internalStop);
+         // The Acceptor needs to start before anything else, so low priority 
to start
+         lockCoordinator.onLockAcquired(this::internalStart, 
LockCoordinator.LOW_PRIORITY);
+         // And the Acceptor needs to stop after everything else, so high 
priority to stop
+         lockCoordinator.onLockReleased(this::internalStop, 
LockCoordinator.HIGH_PRIORITY);
       }
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java
index 7f7deb9b17..bea2faa093 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.server;
 
 import 
org.apache.activemq.artemis.core.config.brokerConnectivity.BrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
 
 /**
  * A broker connection defines a server connection created to provide services 
between this server and another
@@ -32,6 +33,10 @@ public interface BrokerConnection extends ActiveMQComponent {
       // Subclass should override and perform needed cleanup.
    }
 
+   LockCoordinator getLockCoordinator();
+
+   BrokerConnection setLockCoordinator(LockCoordinator lockCoordinator);
+
    /**
     * {@return the unique name of the broker connection}
     */
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
index b614cb6ce8..e4f9cb7483 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.lock;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -47,6 +48,15 @@ public class LockCoordinator extends 
ActiveMQScheduledComponent {
    /** Default period (in milliseconds) for checking lock status */
    public static final int DEFAULT_CHECK_PERIOD = 5000;
 
+   /** Default priority for callbacks when not specified */
+   public static final int DEFAULT_PRIORITY = 10;
+
+   public static final int HIGH_PRIORITY = 20;
+   public static final int LOW_PRIORITY = 5;
+
+   private record PrioritizedCallback(RunnableEx runnable, int priority) {
+   }
+
    String debugInfo;
 
    public String getDebugInfo() {
@@ -60,8 +70,8 @@ public class LockCoordinator extends 
ActiveMQScheduledComponent {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-   private final ArrayList<RunnableEx> lockAcquiredCallback = new 
ArrayList<>();
-   private final ArrayList<RunnableEx> lockReleasedCallback = new 
ArrayList<>();
+   private final ArrayList<PrioritizedCallback> lockAcquiredCallback = new 
ArrayList<>();
+   private final ArrayList<PrioritizedCallback> lockReleasedCallback = new 
ArrayList<>();
    private final long checkPeriod;
    private final String name;
    private final String lockID;
@@ -78,6 +88,7 @@ public class LockCoordinator extends 
ActiveMQScheduledComponent {
     * Registers a callback to be executed when lock is acquired.
     * If the lock is already held when this method is called, the callback
     * will be executed immediately (on the executor thread).
+    * Uses the default priority ({@link #DEFAULT_PRIORITY}).
     *
     * Also In case the runnable throws any exceptions, the lock will be 
released, any previously added callback will be called for stop
     * and the monitor will retry the locks
@@ -85,7 +96,23 @@ public class LockCoordinator extends 
ActiveMQScheduledComponent {
     * @param runnable the callback to execute when lock is acquired
     */
    public void onLockAcquired(RunnableEx runnable) {
-      this.lockAcquiredCallback.add(runnable);
+      onLockAcquired(runnable, DEFAULT_PRIORITY);
+   }
+
+   /**
+    * Registers a callback to be executed when lock is acquired with a 
specified priority.
+    * Callbacks are executed in ascending priority order (lowest priority 
first).
+    * If the lock is already held when this method is called, the callback
+    * will be executed immediately (on the executor thread).
+    *
+    * Also In case the runnable throws any exceptions, the lock will be 
released, any previously added callback will be called for stop
+    * and the monitor will retry the locks
+    *
+    * @param runnable the callback to execute when lock is acquired
+    * @param priority the priority for this callback (lower values execute 
first)
+    */
+   public void onLockAcquired(RunnableEx runnable, int priority) {
+      this.lockAcquiredCallback.add(new PrioritizedCallback(runnable, 
priority));
       // if it's locked we run the runnable being added,
       // however we must check this inside the executor
       // or within a global locking
@@ -94,11 +121,23 @@ public class LockCoordinator extends 
ActiveMQScheduledComponent {
 
    /**
     * Registers a callback to be executed when lock is released or lost.
+    * Uses the default priority ({@link #DEFAULT_PRIORITY}).
     *
     * @param runnable the callback to execute when lock is released
     */
    public void onLockReleased(RunnableEx runnable) {
-      this.lockReleasedCallback.add(runnable);
+      onLockReleased(runnable, DEFAULT_PRIORITY);
+   }
+
+   /**
+    * Registers a callback to be executed when lock is released or lost with a 
specified priority.
+    * Callbacks are executed in ascending priority order (lowest priority 
first).
+    *
+    * @param runnable the callback to execute when lock is released
+    * @param priority the priority for this callback (lower values execute 
first)
+    */
+   public void onLockReleased(RunnableEx runnable, int priority) {
+      this.lockReleasedCallback.add(new PrioritizedCallback(runnable, 
priority));
    }
 
    /**
@@ -175,12 +214,18 @@ public class LockCoordinator extends 
ActiveMQScheduledComponent {
       this.locked = locked;
       if (locked) {
          AtomicBoolean treatErrors = new AtomicBoolean(false);
-         lockAcquiredCallback.forEach(r -> doRunTreatingErrors(r, 
treatErrors));
+         // Sort callbacks by priority (lowest first) and execute them
+         lockAcquiredCallback.stream()
+            .sorted(Comparator.comparingInt(pc -> pc.priority))
+            .forEach(pc -> doRunTreatingErrors(pc.runnable, treatErrors));
          if (treatErrors.get()) {
             retryLock();
          }
       } else {
-         lockReleasedCallback.forEach(this::doRunWithLogException);
+         // Sort callbacks by priority (lowest first) and execute them
+         lockReleasedCallback.stream()
+            .sorted(Comparator.comparingInt(pc -> pc.priority))
+            .forEach(pc -> doRunWithLogException(pc.runnable));
       }
    }
 
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 7354d8dc09..d65b21327d 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2227,6 +2227,13 @@
             </xsd:documentation>
          </xsd:annotation>
       </xsd:attribute>
+      <xsd:attribute name="lock-coordinator" type="xsd:string">
+         <xsd:annotation>
+            <xsd:documentation>
+               The lock-coordinator used to pause and resume this broker 
connection
+            </xsd:documentation>
+         </xsd:annotation>
+      </xsd:attribute>
       <xsd:attribute name="auto-start" type="xsd:boolean" default="true">
          <xsd:annotation>
             <xsd:documentation>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 7f85aaf484..6c81cd83ee 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -154,6 +154,13 @@ public class FileConfigurationParserTest extends 
ServerTestBase {
          </lock-coordinator>
       </lock-coordinators>""";
 
+   private static final String BROKER_CONNECTION_PART = """
+            <broker-connections>
+               <amqp-connection uri="tcp://someHost:61000" name="someMirror" 
retry-interval="2000" lock-coordinator="my-lock">
+                  <mirror sync="false"/>
+               </amqp-connection>
+            </broker-connections>
+      """;
 
    /**
     * These "InvalidConfigurationTest*.xml" files are modified copies of 
{@literal ConfigurationTest-full-config.xml},
@@ -451,7 +458,7 @@ public class FileConfigurationParserTest extends 
ServerTestBase {
    @Test
    public void testLockCoordinatorParse() throws Exception {
       FileConfigurationParser parser = new FileConfigurationParser();
-      String configStr = FIRST_PART + LOCK_COORDINATOR_PART + LAST_PART;
+      String configStr = FIRST_PART + LOCK_COORDINATOR_PART + 
BROKER_CONNECTION_PART + LAST_PART;
       Configuration configuration = parser.parseMainConfig(new 
ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8)));
 
       Collection<LockCoordinatorConfiguration> lockConfigurations = 
configuration.getLockCoordinatorConfigurations();
@@ -467,6 +474,8 @@ public class FileConfigurationParserTest extends 
ServerTestBase {
          assertEquals("value2", properties.get("test2"));
       }
       configuration.getAcceptorConfigurations().stream().filter(f -> 
f.getName().equals("netty-with-lock")).forEach(f -> assertEquals("my-lock", 
f.getLockCoordinator()));
+      assertEquals(1, configuration.getAMQPConnection().size());
+      assertEquals("my-lock", 
configuration.getAMQPConnection().get(0).getLockCoordinator());
    }
 
    @Test
diff --git a/docs/user-manual/lock-coordination.adoc 
b/docs/user-manual/lock-coordination.adoc
index c5ca2c7466..64ebaeda4d 100644
--- a/docs/user-manual/lock-coordination.adoc
+++ b/docs/user-manual/lock-coordination.adoc
@@ -5,38 +5,39 @@
 
 The Lock Coordinator provides pluggable distributed lock mechanism monitoring.
 It allows multiple broker instances to coordinate the activation of specific 
configuration elements, ensuring that only one broker instance activates a 
particular element at any given time.
+This prevents split-brain scenarios where multiple brokers could 
simultaneously process messages, leading to duplicate processing or conflicting 
state.
 
-When a broker acquires a lock through a distributed lock, the associated 
configuration elements are activated.
-If the lock is lost or released, those elements are deactivated.
+In the current version, the Lock Coordinator can be applied to control the 
startup and shutdown of:
 
-In the current version, the Lock Coordinator can be applied to control the 
startup and shutdown of acceptors.
-When an acceptor is associated with a lock coordinator, it will only start 
accepting connections when the broker successfully acquires the distributed 
lock.
-If lock is lost for any reason, the acceptor automatically stops accepting new 
connections.
+* *Acceptors* - When an acceptor is associated with a lock coordinator, it 
will only start accepting connections when the broker successfully acquires the 
distributed lock. If the lock is lost for any reason, the acceptor 
automatically stops accepting new connections.
 
-The same pattern used on acceptors may eventually be applied to other 
configuration elements.
+* *Broker Connections* - When a broker connection (AMQP mirror, federation, or 
bridge) is associated with a lock coordinator, it will only be active when the 
broker successfully acquires the distributed lock. If the lock is lost, the 
connection is paused, preventing message flow. When the lock is reacquired, the 
connection resumes automatically.
+
+The same pattern used on acceptors and broker connections may eventually be 
applied to other configuration elements.
 If you have ideas for additional use cases where this pattern could be 
applied, please file a JIRA issue.
 
 WARNING: This feature is in technical preview and its configuration elements 
are subject to possible modifications.
 
 == Configuration
 
-It is possible to specify multiple lock-coordinators and associate them with 
other broker elements.
-
-The broker element associated with a lock-coordinator (e.g., an acceptor) will 
only be started if the distributed lock can be acquired.
-If the lock cannot be acquired or is lost, the associated element will be 
stopped.
+You can define multiple lock-coordinators and associate them with broker 
elements such as acceptors and broker connections.
+When a broker element is associated with a lock-coordinator, it will only 
activate when the distributed lock has been acquired.
+If the lock cannot be acquired or is lost, the elements are automatically 
stopped or paused.
 
-This pattern can be used to ensure clients connect to only one of your 
mirrored brokers at a time, preventing split-brain scenarios and duplicate 
message processing.
+Different lock implementations (File-based, ZooKeeper-based, or custom 
plugins) provide different configuration properties.
+See the <<Configuration Options>> section below for detailed tables of 
available options for each lock implementation.
 
-Depending on the provider selector, multiple configuration options can be 
provided.
-Please consult the javadoc for your lock implementation.
-A simple table will be provided in this chapter for the two reference 
implementations we provide, but this could be a plugin being added to your 
broker.
+=== Configuration Example
 
-In this next example, we configure a broker with:
+The following example demonstrates a star topology HA configuration where only 
one broker is active at a time:
 
-* Two acceptors: one for mirroring traffic (`for-mirroring-only`) and one for 
client connections (`for-clients-only`)
-* A File-based lock-coordinator named `clients-lock`
-* The client acceptor associated with the lock-coordinator, so it only 
activates when the distributed lock is acquired
-* A mirror connection to another broker for data replication
+* Two acceptors:
+  - `for-mirroring-only` - Always active to receive mirrored data from other 
brokers
+  - `for-clients-only` - Controlled by lock-coordinator; only active when lock 
is held
+* File-based lock-coordinator named `clients-lock` using shared storage
+* Broker connections controlled by the lock-coordinator (mirrors, bridges, or 
federations):
+  - Two mirror connections (`mirrorB` and `mirrorC`) for data replication
+  - One bridge connection for forwarding messages to a remote broker
 
 [,xml]
 ----
@@ -58,15 +59,44 @@ In this next example, we configure a broker with:
 </lock-coordinators>
 
 <broker-connections>
-   <amqp-connection uri="tcp://otherBroker:61000" name="mirror" 
retry-interval="2000">
+   <amqp-connection uri="tcp://otherBroker:61001" name="mirrorB" 
retry-interval="2000" lock-coordinator="clients-lock">
       <mirror sync="false"/>
    </amqp-connection>
+   <amqp-connection uri="tcp://otherBroker:61002" name="mirrorC" 
retry-interval="2000" lock-coordinator="clients-lock">
+      <mirror sync="false"/>
+   </amqp-connection>
+   <amqp-connection uri="tcp://remoteBroker:61616" name="bridgeConnection" 
retry-interval="2000" lock-coordinator="clients-lock">
+      <bridge>
+         <bridge-to-queue name="orders" remote-address="orders">
+            <include queue-match="orders" address-match="orders"/>
+         </bridge-to-queue>
+      </bridge>
+   </amqp-connection>
 </broker-connections>
 
 
 ----
 
-In the previous configuration, the broker will use a file lock, and the 
acceptor will only be active if it can hold the distributed lock between the 
mirrored brokers.
+==== How It Works
+
+When a broker successfully acquires the distributed lock:
+
+* The client acceptor (`for-clients-only`) starts accepting connections
+* Mirror connections activate and begin replicating data to other brokers
+* Bridge connections activate and forward messages to remote brokers
+* Federation connections activate (if configured)
+
+When the lock is lost (due to failure or network partition):
+
+* The client acceptor stops accepting new connections
+* All broker connections (mirrors, bridges, and federations) pause immediately
+* The broker continues to receive mirrored data on the `for-mirroring-only` 
acceptor
+
+This ensures only one broker actively processes client requests, replicates 
data, and forwards messages at any given time.
+
+==== Important Considerations
+
+WARNING: When mixing bridges or federations with mirrors in the same 
configuration, message duplication is possible during failover transitions. 
When a message arrives at a mirror target and that node's broker connections 
are active, the message may be forwarded even though it was already processed 
by the original source. Lock coordination significantly reduces this risk but 
cannot completely eliminate it during the brief window when locks are being 
transferred between brokers.
 
 image:images/lock-coordination-example.png[HA with mirroring]
 
@@ -74,9 +104,11 @@ You can find a 
https://github.com/apache/artemis-examples/tree/main/examples/fea
 
 == Configuration Options
 
+All lock-coordinator implementations share common configuration elements and 
provide implementation-specific properties.
+
 === Common Configuration
 
-The following elements are configured on lock-coordinator
+The following elements are configured on every lock-coordinator regardless of 
implementation:
 
 [cols="1,1,1,3"]
 |===
@@ -103,10 +135,14 @@ The following elements are configured on lock-coordinator
 |How often to check if the lock is still valid, in milliseconds
 |===
 
-=== File
+=== File-Based Lock Manager
 
-The file-based lock uses the file system to manage distributed locks.
-It is provided by the 
class-name=`org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager`
+The file-based lock manager uses the file system to manage distributed locks 
through file locking mechanisms.
+All brokers must have access to a shared file system location.
+
+**Class name:** 
`org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager`
+
+**Properties:**
 
 [cols="1,1,1,3"]
 |===
@@ -118,10 +154,14 @@ It is provided by the 
class-name=`org.apache.activemq.artemis.lockmanager.file.F
 |Path to the directory where lock files will be created and managed. The 
directory must be created in advance before using this lock.
 |===
 
-=== ZooKeeper
+=== ZooKeeper-Based Lock Manager
+
+The ZooKeeper-based lock manager uses Apache Curator to manage distributed 
locks via a ZooKeeper ensemble.
+This is the recommended approach for production deployments as it provides 
better fault tolerance and doesn't require shared storage.
+
+**Class name:** 
`org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager`
 
-The ZooKeeper-based lock uses Apache Curator to manage distributed locks via 
ZooKeeper.
-It is provided by the 
class-name=`org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager`
+**Properties:**
 
 [cols="1,1,1,3"]
 |===
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index 5bc25a7fa0..d6a5925dce 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -110,6 +110,7 @@ import 
org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.ServiceComponent;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ServerLegacyProducersImpl;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import 
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@@ -6453,6 +6454,16 @@ public class ActiveMQServerControlTest extends 
ManagementTestBase {
             started = true;
          }
 
+         @Override
+         public LockCoordinator getLockCoordinator() {
+            return null;
+         }
+
+         @Override
+         public BrokerConnection setLockCoordinator(LockCoordinator 
lockCoordinator) {
+            return null;
+         }
+
          @Override
          public void stop() throws Exception {
             started = false;
diff --git 
a/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/A/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/A/broker.xml
new file mode 100644
index 0000000000..bcea27b38a
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/A/broker.xml
@@ -0,0 +1,181 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <security-enabled>false</security-enabled>
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- useKQueue means: it will use Netty kqueue if you are on a system 
(MacOS) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="artemis">tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+         <acceptor name="forClients" 
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+      </acceptors>
+
+      <lock-coordinators>
+         <lock-coordinator name="failover">
+            
<class-name>org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager</class-name>
+            <lock-id>fail</lock-id>
+            <properties>
+               <property key="connect-string" value="localhost:2181"/>
+            </properties>
+         </lock-coordinator>
+      </lock-coordinators>
+
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61001" name="mirrorB" 
retry-interval="2000" lock-coordinator="failover">
+            <mirror sync="false"/>
+         </amqp-connection>
+         <amqp-connection uri="tcp://localhost:61002" name="mirrorC" 
retry-interval="2000" lock-coordinator="failover">
+            <mirror sync="false"/>
+         </amqp-connection>
+         <amqp-connection uri="tcp://localhost:61626" name="bridgeTo" 
retry-interval="2000" lock-coordinator="failover">
+            <bridge>
+               <bridge-to-queue name="bridgeQueue" 
remote-address="bridgeQueue">
+                  <include queue-match="bridgeQueue" 
address-match="bridgeQueue"/>
+               </bridge-to-queue>
+            </bridge>
+         </amqp-connection>
+      </broker-connections>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management.#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <address-setting match="myQueue">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue"/>
+            </anycast>
+         </address>
+         <address name="myQueue">
+            <anycast>
+               <queue name="myQueue">
+               </queue>
+            </anycast>
+         </address>
+         <address name="bridgeQueue">
+            <anycast>
+               <queue name="bridgeQueue">
+               </queue>
+            </anycast>
+         </address>
+      </addresses>
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/B/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/B/broker.xml
new file mode 100644
index 0000000000..f00d1f8b54
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/B/broker.xml
@@ -0,0 +1,181 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <security-enabled>false</security-enabled>
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- useKQueue means: it will use Netty kqueue if you are on a system 
(MacOS) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="artemis">tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+         <acceptor name="forClients" 
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+      </acceptors>
+
+      <lock-coordinators>
+         <lock-coordinator name="failover">
+            
<class-name>org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager</class-name>
+            <lock-id>fail</lock-id>
+            <properties>
+               <property key="connect-string" value="localhost:2181"/>
+            </properties>
+         </lock-coordinator>
+      </lock-coordinators>
+
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61000" name="mirrorA" 
retry-interval="2000" lock-coordinator="failover">
+            <mirror sync="false"/>
+         </amqp-connection>
+         <amqp-connection uri="tcp://localhost:61002" name="mirrorC" 
retry-interval="2000" lock-coordinator="failover">
+            <mirror sync="false"/>
+         </amqp-connection>
+         <amqp-connection uri="tcp://localhost:61626" name="bridgeTo" 
retry-interval="2000" lock-coordinator="failover">
+            <bridge>
+               <bridge-to-queue name="bridgeQueue" 
remote-address="bridgeQueue">
+                  <include queue-match="bridgeQueue" 
address-match="bridgeQueue"/>
+               </bridge-to-queue>
+            </bridge>
+         </amqp-connection>
+      </broker-connections>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management.#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <address-setting match="myQueue">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue"/>
+            </anycast>
+         </address>
+         <address name="myQueue">
+            <anycast>
+               <queue name="myQueue">
+               </queue>
+            </anycast>
+         </address>
+         <address name="bridgeQueue">
+            <anycast>
+               <queue name="bridgeQueue">
+               </queue>
+            </anycast>
+         </address>
+      </addresses>
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/C/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/C/broker.xml
new file mode 100644
index 0000000000..da8b162c9c
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/C/broker.xml
@@ -0,0 +1,181 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <security-enabled>false</security-enabled>
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- useKQueue means: it will use Netty kqueue if you are on a system 
(MacOS) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="artemis">tcp://0.0.0.0:61002?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+         <acceptor name="forClients" 
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+      </acceptors>
+
+      <lock-coordinators>
+         <lock-coordinator name="failover">
+            
<class-name>org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager</class-name>
+            <lock-id>fail</lock-id>
+            <properties>
+               <property key="connect-string" value="localhost:2181"/>
+            </properties>
+         </lock-coordinator>
+      </lock-coordinators>
+
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61000" name="mirrorA" 
retry-interval="2000" lock-coordinator="failover">
+            <mirror sync="false"/>
+         </amqp-connection>
+         <amqp-connection uri="tcp://localhost:61001" name="mirrorB" 
retry-interval="2000" lock-coordinator="failover">
+            <mirror sync="false"/>
+         </amqp-connection>
+         <amqp-connection uri="tcp://localhost:61626" name="bridgeTo" 
retry-interval="2000" lock-coordinator="failover">
+            <bridge>
+               <bridge-to-queue name="bridgeQueue" 
remote-address="bridgeQueue">
+                  <include queue-match="bridgeQueue" 
address-match="bridgeQueue"/>
+               </bridge-to-queue>
+            </bridge>
+         </amqp-connection>
+      </broker-connections>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management.#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <address-setting match="myQueue">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue"/>
+            </anycast>
+         </address>
+         <address name="myQueue">
+            <anycast>
+               <queue name="myQueue">
+               </queue>
+            </anycast>
+         </address>
+         <address name="bridgeQueue">
+            <anycast>
+               <queue name="bridgeQueue">
+               </queue>
+            </anycast>
+         </address>
+      </addresses>
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/A/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/A/broker.xml
new file mode 100644
index 0000000000..99fb7c6e4d
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/A/broker.xml
@@ -0,0 +1,182 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <security-enabled>false</security-enabled>
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- useKQueue means: it will use Netty kqueue if you are on a system 
(MacOS) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="artemis">tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+         <acceptor name="forClients" 
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+      </acceptors>
+
+      <lock-coordinators>
+         <lock-coordinator name="failover">
+            
<class-name>org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager</class-name>
+            <lock-id>fail</lock-id>
+            <check-period>5000</check-period>
+            <properties>
+               <property key="locks-folder" value="CHANGEME"/>
+            </properties>
+         </lock-coordinator>
+      </lock-coordinators>
+
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61001" name="mirrorB" 
retry-interval="2000" lock-coordinator="failover">
+            <mirror sync="false"/>
+         </amqp-connection>
+         <amqp-connection uri="tcp://localhost:61002" name="mirrorC" 
retry-interval="2000" lock-coordinator="failover">
+            <mirror sync="false"/>
+         </amqp-connection>
+         <amqp-connection uri="tcp://localhost:61626" name="bridgeTo" 
retry-interval="2000" lock-coordinator="failover">
+            <bridge>
+               <bridge-to-queue name="bridgeQueue" 
remote-address="bridgeQueue">
+                  <include queue-match="bridgeQueue" 
address-match="bridgeQueue"/>
+               </bridge-to-queue>
+            </bridge>
+         </amqp-connection>
+      </broker-connections>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management.#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <address-setting match="myQueue">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue"/>
+            </anycast>
+         </address>
+         <address name="myQueue">
+            <anycast>
+               <queue name="myQueue">
+               </queue>
+            </anycast>
+         </address>
+         <address name="bridgeQueue">
+            <anycast>
+               <queue name="bridgeQueue">
+               </queue>
+            </anycast>
+         </address>
+      </addresses>
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/B/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/B/broker.xml
new file mode 100644
index 0000000000..2279eab149
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/B/broker.xml
@@ -0,0 +1,182 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <security-enabled>false</security-enabled>
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- useKQueue means: it will use Netty kqueue if you are on a system 
(MacOS) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="artemis">tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+         <acceptor name="forClients" 
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+      </acceptors>
+
+      <lock-coordinators>
+         <lock-coordinator name="failover">
+            
<class-name>org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager</class-name>
+            <lock-id>fail</lock-id>
+            <check-period>5000</check-period>
+            <properties>
+               <property key="locks-folder" value="CHANGEME"/>
+            </properties>
+         </lock-coordinator>
+      </lock-coordinators>
+
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61000" name="mirrorA" 
retry-interval="2000" lock-coordinator="failover">
+            <mirror sync="false"/>
+         </amqp-connection>
+         <amqp-connection uri="tcp://localhost:61002" name="mirrorC" 
retry-interval="2000" lock-coordinator="failover">
+            <mirror sync="false"/>
+         </amqp-connection>
+         <amqp-connection uri="tcp://localhost:61626" name="bridgeTo" 
retry-interval="2000" lock-coordinator="failover">
+            <bridge>
+               <bridge-to-queue name="bridgeQueue" 
remote-address="bridgeQueue">
+                  <include queue-match="bridgeQueue" 
address-match="bridgeQueue"/>
+               </bridge-to-queue>
+            </bridge>
+         </amqp-connection>
+      </broker-connections>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management.#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <address-setting match="myQueue">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue"/>
+            </anycast>
+         </address>
+         <address name="myQueue">
+            <anycast>
+               <queue name="myQueue">
+               </queue>
+            </anycast>
+         </address>
+         <address name="bridgeQueue">
+            <anycast>
+               <queue name="bridgeQueue">
+               </queue>
+            </anycast>
+         </address>
+      </addresses>
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/C/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/C/broker.xml
new file mode 100644
index 0000000000..5bad7cebee
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/C/broker.xml
@@ -0,0 +1,182 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <security-enabled>false</security-enabled>
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- useKQueue means: it will use Netty kqueue if you are on a system 
(MacOS) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="artemis">tcp://0.0.0.0:61002?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+         <acceptor name="forClients" 
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+      </acceptors>
+
+      <lock-coordinators>
+         <lock-coordinator name="failover">
+            
<class-name>org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager</class-name>
+            <lock-id>fail</lock-id>
+            <check-period>5000</check-period>
+            <properties>
+               <property key="locks-folder" value="CHANGEME"/>
+            </properties>
+         </lock-coordinator>
+      </lock-coordinators>
+
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61000" name="mirrorA" 
retry-interval="2000" lock-coordinator="failover">
+            <mirror sync="false"/>
+         </amqp-connection>
+         <amqp-connection uri="tcp://localhost:61001" name="mirrorB" 
retry-interval="2000" lock-coordinator="failover">
+            <mirror sync="false"/>
+         </amqp-connection>
+         <amqp-connection uri="tcp://localhost:61626" name="bridgeTo" 
retry-interval="2000" lock-coordinator="failover">
+            <bridge>
+               <bridge-to-queue name="bridgeQueue" 
remote-address="bridgeQueue">
+                  <include queue-match="bridgeQueue" 
address-match="bridgeQueue"/>
+               </bridge-to-queue>
+            </bridge>
+         </amqp-connection>
+      </broker-connections>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management.#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <address-setting match="myQueue">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue"/>
+            </anycast>
+         </address>
+         <address name="myQueue">
+            <anycast>
+               <queue name="myQueue">
+               </queue>
+            </anycast>
+         </address>
+         <address name="bridgeQueue">
+            <anycast>
+               <queue name="bridgeQueue">
+               </queue>
+            </anycast>
+         </address>
+      </addresses>
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
index 2af144d8fb..426f4e503d 100644
--- 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
@@ -106,6 +106,7 @@ public class LockCoordinatorTest extends ActiveMQTestBase {
       testAddAfterLocked(lockCoordinatorSupplier.apply(1).get(0));
       testRetryAfterError(lockCoordinatorSupplier.apply(1).get(0));
       testRetryAfterErrorWithDelayAdd(lockCoordinatorSupplier.apply(1).get(0));
+      testPriorityOrdering(lockCoordinatorSupplier.apply(1).get(0));
 
       {
          List<LockCoordinator> list = lockCoordinatorSupplier.apply(2);
@@ -179,6 +180,41 @@ public class LockCoordinatorTest extends ActiveMQTestBase {
       }
    }
 
+   private void testPriorityOrdering(LockCoordinator lockCoordinator) throws 
Exception {
+      lockHolderCount.set(0);
+      lockChanged.set(0);
+
+      try {
+         // Add callbacks with different priorities (lower values execute 
first)
+         List<Integer> executionOrder = new ArrayList<>();
+         lockCoordinator.onLockAcquired(() -> executionOrder.add(3), 30);
+         lockCoordinator.onLockAcquired(() -> executionOrder.add(1), 10);
+         lockCoordinator.onLockAcquired(() -> executionOrder.add(2), 20);
+         lockCoordinator.onLockAcquired(() -> executionOrder.add(4), 40);
+
+         List<Integer> releaseOrder = new ArrayList<>();
+         lockCoordinator.onLockReleased(() -> releaseOrder.add(2), 20);
+         lockCoordinator.onLockReleased(() -> releaseOrder.add(4), 40);
+         lockCoordinator.onLockReleased(() -> releaseOrder.add(1), 10);
+         lockCoordinator.onLockReleased(() -> releaseOrder.add(3), 30);
+
+         lockCoordinator.start();
+         Wait.assertEquals(1, () -> lockHolderCount.get(), 15000, 100);
+
+         // Verify acquired callbacks executed in priority order (lowest first)
+         Wait.assertEquals(4, executionOrder::size, 5000, 100);
+         assertEquals(List.of(1, 2, 3, 4), executionOrder);
+
+         lockCoordinator.stop();
+
+         // Verify released callbacks executed in priority order (lowest first)
+         Wait.assertEquals(4, releaseOrder::size, 5000, 100);
+         assertEquals(List.of(1, 2, 3, 4), releaseOrder);
+      } finally {
+         lockCoordinator.stop();
+      }
+   }
+
    // validate that no retry would happen since the lock wasn't held in the 
secondLock
    private void testNoRetryWhileNotAcquired(LockCoordinator firstLock, 
LockCoordinator secondLock) throws Exception {
       lockHolderCount.set(0);
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/StarMirrorSingleAcceptorRunningTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/StarMirrorSingleAcceptorRunningTest.java
new file mode 100644
index 0000000000..5078687005
--- /dev/null
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/StarMirrorSingleAcceptorRunningTest.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.artemis.tests.smoke.lockmanager;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.function.Consumer;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class StarMirrorSingleAcceptorRunningTest extends SmokeTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   public static final String SERVER_NAME_WITH_ZK_A = 
"lockmanager/starMirrorSingleAcceptor/ZK/A";
+   public static final String SERVER_NAME_WITH_ZK_B = 
"lockmanager/starMirrorSingleAcceptor/ZK/B";
+   public static final String SERVER_NAME_WITH_ZK_C = 
"lockmanager/starMirrorSingleAcceptor/ZK/C";
+
+   public static final String SERVER_NAME_WITH_FILE_A = 
"lockmanager/starMirrorSingleAcceptor/file/A";
+   public static final String SERVER_NAME_WITH_FILE_B = 
"lockmanager/starMirrorSingleAcceptor/file/B";
+   public static final String SERVER_NAME_WITH_FILE_C = 
"lockmanager/starMirrorSingleAcceptor/file/C";
+
+
+   public static final String SERVER_NAME_BRIDGE_TARGET = 
"lockmanager/starMirrorSingleAcceptor/bridgeTarget";
+
+   // Test constants
+   private static final int ALTERNATING_TEST_ITERATIONS = 3;
+   private static final int MESSAGES_SENT_PER_ITERATION = 100;
+   private static final int BRIDGE_MESSAGES_SENT_PER_ITERATION = 7;
+   private static final int MESSAGES_CONSUMED_PER_ITERATION = 17;
+   private static final int MESSAGES_REMAINING_PER_ITERATION = 
MESSAGES_SENT_PER_ITERATION - MESSAGES_CONSUMED_PER_ITERATION;
+   private static final int EXPECTED_FINAL_MESSAGE_COUNT = 
ALTERNATING_TEST_ITERATIONS * MESSAGES_REMAINING_PER_ITERATION;
+
+
+   private static final int ZK_BASE_PORT = 2181;
+
+   Process processA;
+   Process processB;
+   Process processC;
+   Process bridgeTargetProcess;
+
+   private static void customizeFileServer(File serverLocation, File fileLock) 
{
+      try {
+         FileUtil.findReplace(new File(serverLocation, "/etc/broker.xml"), 
"CHANGEME", fileLock.getAbsolutePath());
+      } catch (Throwable e) {
+         throw new RuntimeException(e.getMessage(), e);
+      }
+   }
+
+   private static void createServers(String serverNameA, String serverNameB, 
String serverNameC,
+                                        String configPathA, String 
configPathB, String configPathC,
+                                        Consumer<File> customizeServer) throws 
Exception {
+      File serverLocationA = getFileServerLocation(serverNameA);
+      File serverLocationB = getFileServerLocation(serverNameB);
+      File serverLocationC = getFileServerLocation(serverNameC);
+      deleteDirectory(serverLocationA);
+      deleteDirectory(serverLocationB);
+      deleteDirectory(serverLocationC);
+
+      createSingleServer(serverLocationA, configPathA, customizeServer);
+      createSingleServer(serverLocationB, configPathB, customizeServer);
+      createSingleServer(serverLocationC, configPathC, customizeServer);
+
+      File bridgeTarget = getFileServerLocation(SERVER_NAME_BRIDGE_TARGET);
+      createBridgeTarget(bridgeTarget);
+   }
+
+   private static void createSingleServer(File serverLocation, String 
configPath,
+                                           Consumer<File> customizeServer) 
throws Exception {
+      HelperCreate cliCreateServer = helperCreate();
+      cliCreateServer.setAllowAnonymous(true)
+                     .setUser("admin")
+                     .setPassword("adming")
+                     .setNoWeb(true)
+                     .setConfiguration(configPath)
+                     .setArtemisInstance(serverLocation);
+      cliCreateServer.createServer();
+
+      if (customizeServer != null) {
+         customizeServer.accept(serverLocation);
+      }
+   }
+
+
+   private static void createBridgeTarget(File serverLocation) throws 
Exception {
+      HelperCreate cliCreateServer = helperCreate();
+      cliCreateServer.setAllowAnonymous(true)
+         .setUser("admin")
+         .setPassword("admin")
+         .setNoWeb(true)
+         .setPortOffset(10)
+         .setArtemisInstance(serverLocation);
+      cliCreateServer.addArgs("--queues", "bridgeQueue");
+      cliCreateServer.createServer();
+   }
+
+
+
+   @BeforeEach
+   public void prepareServers() throws Exception {
+
+   }
+
+   @Test
+   public void testAlternatingZK() throws Throwable {
+      {
+         createServers(SERVER_NAME_WITH_ZK_A, SERVER_NAME_WITH_ZK_B, 
SERVER_NAME_WITH_ZK_C,
+                          
"./src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/A",
+                          
"./src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/B",
+                          
"./src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/C",
+                          null);
+
+         cleanupData(SERVER_NAME_WITH_ZK_A);
+         cleanupData(SERVER_NAME_WITH_ZK_B);
+         cleanupData(SERVER_NAME_WITH_ZK_C);
+         cleanupData(SERVER_NAME_BRIDGE_TARGET);
+      }
+
+      // starting zookeeper
+      ZookeeperCluster zkCluster = new ZookeeperCluster(temporaryFolder, 1, 
ZK_BASE_PORT, 100);
+      zkCluster.start();
+      runAfter(zkCluster::stop);
+
+      testAlternating(SERVER_NAME_WITH_ZK_A, SERVER_NAME_WITH_ZK_B, 
SERVER_NAME_WITH_ZK_C);
+   }
+
+   @Test
+   public void testAlternatingFile() throws Throwable {
+      File fileLock = new File("./target/serverLock");
+      fileLock.mkdirs();
+
+      {
+         createServers(SERVER_NAME_WITH_FILE_A, SERVER_NAME_WITH_FILE_B, 
SERVER_NAME_WITH_FILE_C,
+                          
"./src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/A",
+                          
"./src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/B",
+                          
"./src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/C",
+                          s -> customizeFileServer(s, fileLock));
+
+         cleanupData(SERVER_NAME_WITH_FILE_A);
+         cleanupData(SERVER_NAME_WITH_FILE_B);
+         cleanupData(SERVER_NAME_WITH_FILE_C);
+         cleanupData(SERVER_NAME_BRIDGE_TARGET);
+      }
+
+      testAlternating(SERVER_NAME_WITH_FILE_A, SERVER_NAME_WITH_FILE_B, 
SERVER_NAME_WITH_FILE_C);
+   }
+
+   public void testAlternating(String nameServerA, String nameServerB, String 
nameServerC) throws Throwable {
+      processA = startServer(nameServerA, 0, 60_000);
+      processB = startServer(nameServerB, 0, -1);
+      processC = startServer(nameServerC, 0, -1);
+      bridgeTargetProcess = startServer(SERVER_NAME_BRIDGE_TARGET, 0, -1);
+
+      ConnectionFactory cfX = CFUtil.createConnectionFactory("amqp", 
"tcp://localhost:61616");
+
+      final String queueName = "myQueue";
+      final String bridgeQueue = "bridgeQueue";
+
+      validateStar(cfX);
+
+      for (int i = 0; i < ALTERNATING_TEST_ITERATIONS; i++) {
+         String activeServer = (i % 3 == 0) ? "A" : (i % 3 == 1) ? "B" : "C";
+         logger.info("Iteration {}: Server {} active", i, activeServer);
+
+         if (i % 3 == 0) {
+            // Iteration 0, 3, 6, ...: Server A active, kill Server B and C
+            killServer(processB);
+            killServer(processC);
+         } else if (i % 3 == 1) {
+            // Iteration 1, 4, 7, ...: Server B active, kill Server A and C
+            killServer(processA);
+            killServer(processC);
+         } else {
+            // Iteration 2, 5, 8, ...: Server C active, kill Server A and B
+            killServer(processA);
+            killServer(processB);
+         }
+
+         // Send messages through the shared acceptor
+         sendMessages(cfX, queueName, MESSAGES_SENT_PER_ITERATION);
+
+         // Consume some messages
+         receiveMessages(cfX, queueName, MESSAGES_CONSUMED_PER_ITERATION);
+
+         // Restart the killed servers
+         if (i % 3 == 0) {
+            processB = startServer(nameServerB, 0, -1);
+            processC = startServer(nameServerC, 0, -1);
+         } else if (i % 3 == 1) {
+            processA = startServer(nameServerA, 0, -1);
+            processC = startServer(nameServerC, 0, -1);
+         } else {
+            processA = startServer(nameServerA, 0, -1);
+            processB = startServer(nameServerB, 0, -1);
+         }
+
+         assertEmptySNFs();
+
+         assertMessageCount("tcp://localhost:61000", queueName, 
MESSAGES_REMAINING_PER_ITERATION * (i + 1));
+         assertMessageCount("tcp://localhost:61001", queueName, 
MESSAGES_REMAINING_PER_ITERATION * (i + 1));
+         assertMessageCount("tcp://localhost:61002", queueName, 
MESSAGES_REMAINING_PER_ITERATION * (i + 1));
+
+         sendMessages(cfX, bridgeQueue, BRIDGE_MESSAGES_SENT_PER_ITERATION);
+         assertMessageCount("tcp://localhost:61626", bridgeQueue, 
BRIDGE_MESSAGES_SENT_PER_ITERATION * (i + 1));
+         assertMessageCount("tcp://localhost:61000", bridgeQueue, 0);
+         assertMessageCount("tcp://localhost:61001", bridgeQueue, 0);
+         assertMessageCount("tcp://localhost:61002", bridgeQueue, 0);
+      }
+
+      // Verify they all have the expected message count (iterations × (sent - 
consumed))
+      assertMessageCount("tcp://localhost:61000", queueName, 
EXPECTED_FINAL_MESSAGE_COUNT);
+      assertMessageCount("tcp://localhost:61001", queueName, 
EXPECTED_FINAL_MESSAGE_COUNT);
+      assertMessageCount("tcp://localhost:61002", queueName, 
EXPECTED_FINAL_MESSAGE_COUNT);
+
+      assertMessageCount("tcp://localhost:61000", bridgeQueue, 0);
+      assertMessageCount("tcp://localhost:61001", bridgeQueue, 0);
+      assertMessageCount("tcp://localhost:61002", bridgeQueue, 0);
+
+      assertMessageCount("tcp://localhost:61626", bridgeQueue, 
BRIDGE_MESSAGES_SENT_PER_ITERATION * ALTERNATING_TEST_ITERATIONS);
+
+      receiveMessages(cfX, queueName, EXPECTED_FINAL_MESSAGE_COUNT);
+      assertMessageCount("tcp://localhost:61000", queueName, 0);
+      assertMessageCount("tcp://localhost:61001", queueName, 0);
+      assertMessageCount("tcp://localhost:61002", queueName, 0);
+      assertEmptySNFs();
+
+      validateStar(cfX);
+   }
+
+   private void assertEmptySNFs() throws Exception {
+      assertMessageCount("tcp://localhost:61000", 
"$ACTIVEMQ_ARTEMIS_MIRROR_mirrorB", 0);
+      assertMessageCount("tcp://localhost:61000", 
"$ACTIVEMQ_ARTEMIS_MIRROR_mirrorC", 0);
+
+      assertMessageCount("tcp://localhost:61001", 
"$ACTIVEMQ_ARTEMIS_MIRROR_mirrorA", 0);
+      assertMessageCount("tcp://localhost:61001", 
"$ACTIVEMQ_ARTEMIS_MIRROR_mirrorC", 0);
+
+      assertMessageCount("tcp://localhost:61002", 
"$ACTIVEMQ_ARTEMIS_MIRROR_mirrorA", 0);
+      assertMessageCount("tcp://localhost:61002", 
"$ACTIVEMQ_ARTEMIS_MIRROR_mirrorB", 0);
+   }
+
+   private void validateStar(ConnectionFactory cfX) throws Exception {
+      // validate the star combination
+      sendMessages(cfX, "myQueue", 1_000);
+      assertMessageCount("tcp://localhost:61000", "myQueue", 1_000);
+      assertMessageCount("tcp://localhost:61001", "myQueue", 1_000);
+      assertMessageCount("tcp://localhost:61002", "myQueue", 1_000);
+
+      // validate the star combination
+      receiveMessages(cfX, "myQueue", 1_000);
+      assertMessageCount("tcp://localhost:61000", "myQueue", 0);
+      assertMessageCount("tcp://localhost:61001", "myQueue", 0);
+      assertMessageCount("tcp://localhost:61002", "myQueue", 0);
+   }
+
+   private static void sendMessages(ConnectionFactory cfX, String queueName, 
int nmessages) throws JMSException {
+      try (Connection connectionX = retryUntilIsLive(cfX)) {
+         Session sessionX = connectionX.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = sessionX.createQueue(queueName);
+         MessageProducer producerX = sessionX.createProducer(queue);
+         for (int i = 0; i < nmessages; i++) {
+            producerX.send(sessionX.createTextMessage("hello " + i));
+         }
+         sessionX.commit();
+      }
+   }
+
+   private static Connection retryUntilIsLive(ConnectionFactory cfX) {
+      final int maxRetry = 1000;
+      for (int i = 0; i < maxRetry; i++) {
+         try {
+            return cfX.createConnection();
+         } catch (Exception ex) {
+            logger.info("Exception during connection, retrying the 
connection... {} out of {} retries, message = {}", i, maxRetry, 
ex.getMessage());
+            try {
+               Thread.sleep(500);
+            } catch (Throwable e) {
+            }
+         }
+      }
+      fail("Could not connect after " + maxRetry + " retries");
+      return null; // never happening, fail will throw an exception
+   }
+
+   private static void receiveMessages(ConnectionFactory cfX, String 
queueName, int nmessages) throws JMSException {
+      try (Connection connectionX = retryUntilIsLive(cfX)) {
+         connectionX.start();
+         Session sessionX = connectionX.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue queue = sessionX.createQueue(queueName);
+         MessageConsumer consumerX = sessionX.createConsumer(queue);
+         for (int i = 0; i < nmessages; i++) {
+            TextMessage message = (TextMessage) consumerX.receive(5000);
+            assertNotNull(message, "Expected message " + i + " but got null");
+         }
+         sessionX.commit();
+      }
+   }
+
+   protected void assertMessageCount(String uri, String queueName, long count) 
throws Exception {
+      SimpleManagement simpleManagement = new SimpleManagement(uri, null, 
null);
+      Wait.assertEquals(count, () -> {
+         try {
+            long result = simpleManagement.getMessageCountOnQueue(queueName);
+            if (count != result) {
+               logger.info("validating {} on queue {} expecting count = {}, 
result = {}", uri, queueName, count, result);
+            }
+            return result;
+         } catch (Throwable e) {
+            return -1;
+         }
+      });
+   }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to