This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 007489597b ARTEMIS-5925 Support Star Mirror Configuration on Lock
Coordinator
007489597b is described below
commit 007489597b5ac9fb7477b5d29edec8d4184f1a92
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Mar 2 10:02:16 2026 -0500
ARTEMIS-5925 Support Star Mirror Configuration on Lock Coordinator
Disclaimer: The test StarMirrorSingleAcceptorRunningTest was written with
the help
from Claude agent. I basically asked Claude to copy it from
DualMirrorSingleAcceptorRunning with the additional configuration
options.
---
.../amqp/connect/AMQPBrokerConnection.java | 98 +++++-
.../amqp/connect/AMQPBrokerConnectionManager.java | 8 +
.../connect/mirror/AMQPMirrorControllerSource.java | 21 ++
.../BrokerConnectConfiguration.java | 10 +
.../deployers/impl/FileConfigurationParser.java | 3 +-
.../core/remoting/impl/netty/NettyAcceptor.java | 6 +-
.../artemis/core/server/BrokerConnection.java | 5 +
.../artemis/core/server/lock/LockCoordinator.java | 57 +++-
.../resources/schema/artemis-configuration.xsd | 7 +
.../config/impl/FileConfigurationParserTest.java | 11 +-
docs/user-manual/lock-coordination.adoc | 96 ++++--
.../management/ActiveMQServerControlTest.java | 11 +
.../starMirrorSingleAcceptor/ZK/A/broker.xml | 181 +++++++++++
.../starMirrorSingleAcceptor/ZK/B/broker.xml | 181 +++++++++++
.../starMirrorSingleAcceptor/ZK/C/broker.xml | 181 +++++++++++
.../starMirrorSingleAcceptor/file/A/broker.xml | 182 +++++++++++
.../starMirrorSingleAcceptor/file/B/broker.xml | 182 +++++++++++
.../starMirrorSingleAcceptor/file/C/broker.xml | 182 +++++++++++
.../smoke/lockmanager/LockCoordinatorTest.java | 36 +++
.../StarMirrorSingleAcceptorRunningTest.java | 349 +++++++++++++++++++++
20 files changed, 1758 insertions(+), 49 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 1bb6b4ed8d..54452d4dc4 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -62,6 +62,7 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
import org.apache.activemq.artemis.core.server.mirror.MirrorController;
import
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@@ -161,6 +162,38 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
private final Set<Queue> senders = new HashSet<>();
private final Set<Queue> receivers = new HashSet<>();
private final Map<String, Predicate<Link>> linkClosedInterceptors = new
ConcurrentHashMap<>();
+ private LockCoordinator lockCoordinator;
+
+ /**
+ * This will be false if the lock coordinator is in place and it is not
holding the lock
+ */
+ private volatile boolean enabled = true;
+
+ /**
+ * This will return false when the LockCoordinator lose the lock.
+ * */
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ private void enable() {
+ enabled = true;
+ }
+
+ private void disable() {
+ enabled = false;
+ }
+
+ @Override
+ public LockCoordinator getLockCoordinator() {
+ return lockCoordinator;
+ }
+
+ @Override
+ public AMQPBrokerConnection setLockCoordinator(LockCoordinator
lockCoordinator) {
+ this.lockCoordinator = lockCoordinator;
+ return this;
+ }
final Executor connectExecutor;
final ScheduledExecutorService scheduledExecutorService;
@@ -252,6 +285,19 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
@Override
public synchronized void start() throws Exception {
+
+ if (lockCoordinator != null) {
+ disable();
+ lockCoordinator.onLockAcquired(this::resume);
+ lockCoordinator.onLockReleased(this::pause);
+ } else {
+ enable();
+ }
+
+ this.internalStart();
+ }
+
+ private synchronized void internalStart() throws Exception {
if (!started) {
started = true;
server.getConfiguration().registerBrokerPlugin(this);
@@ -273,19 +319,21 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
logger.warn(e.getMessage(), e);
}
- if (brokerFederation != null) {
- try {
- brokerFederation.start();
- } catch (ActiveMQException e) {
- logger.warn("Error caught while starting federation instance.",
e);
+ if (isEnabled()) {
+ if (brokerFederation != null) {
+ try {
+ brokerFederation.start();
+ } catch (ActiveMQException e) {
+ logger.warn("Error caught while starting federation
instance.", e);
+ }
}
- }
- if (bridgeManagers != null) {
- try {
- bridgeManagers.start();
- } catch (ActiveMQException e) {
- logger.warn("Error caught while starting bridge managers
instance.", e);
+ if (bridgeManagers != null) {
+ try {
+ bridgeManagers.start();
+ } catch (ActiveMQException e) {
+ logger.warn("Error caught while starting bridge managers
instance.", e);
+ }
}
}
@@ -295,6 +343,12 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
@Override
public synchronized void stop() {
+ // NOTE: We don't set enabled=false in the stop for the following reason:
+ // Mirror is supposed to keep capturing events as the broker is
being shutdown or
+ // the connection is being stopped, or even after stopped.
+ // Say as the broker is going down a message is received and
recorded
+ // if the mirror stopped capturing that event, you would miss it
for next time you restart the broker
+ // and our tests would eventually miss an event, or more
importantly users would lose a message during this process
if (started) {
started = false;
server.getConfiguration().unRegisterBrokerPlugin(this);
@@ -359,6 +413,28 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
}
}
+ private synchronized void pause() throws Exception {
+ disable();
+ if (bridgeManagers != null) {
+ bridgeManagers.stop();
+ }
+ if (brokerFederation != null) {
+ brokerFederation.stop();
+ }
+ }
+
+
+ private synchronized void resume() throws Exception {
+ enable();
+ if (bridgeManagers != null) {
+ bridgeManagers.start();
+ }
+ if (brokerFederation != null) {
+ brokerFederation.start();
+ }
+ }
+
+
public ActiveMQServer getServer() {
return server;
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
index 644e07ecc9..95a2284e38 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java
@@ -36,6 +36,7 @@ import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import
org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -89,6 +90,13 @@ public class AMQPBrokerConnectionManager implements
ActiveMQComponent, ClientCon
private void createBrokerConnection(AMQPBrokerConnectConfiguration
configuration, boolean start) throws Exception {
AMQPBrokerConnection amqpBrokerConnection = new
AMQPBrokerConnection(this, configuration, protonProtocolManagerFactory, server);
amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection);
+ if (configuration.getLockCoordinator() != null) {
+ LockCoordinator lockCoordinator =
server.getLockCoordinator(configuration.getLockCoordinator());
+ if (lockCoordinator == null) {
+ throw new IllegalStateException("lock coordinator " +
configuration.getName() + " not found");
+ }
+ amqpBrokerConnection.setLockCoordinator(lockCoordinator);
+ }
amqpBrokerConnection.initialize();
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index fae26f0293..9bd23b5b9c 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -204,6 +204,9 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
@Override
public void addAddress(AddressInfo addressInfo) throws Exception {
+ if (!brokerConnection.isEnabled()) {
+ return;
+ }
logger.trace("{} addAddress {}", server, addressInfo);
if (getControllerInUse() != null && !addressInfo.isInternal()) {
@@ -230,6 +233,9 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
@Override
public void deleteAddress(AddressInfo addressInfo) throws Exception {
+ if (!brokerConnection.isEnabled()) {
+ return;
+ }
logger.trace("{} deleteAddress {}", server, addressInfo);
if (invalidTarget(getControllerInUse()) || addressInfo.isInternal()) {
@@ -246,6 +252,9 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
@Override
public void createQueue(QueueConfiguration queueConfiguration) throws
Exception {
+ if (!brokerConnection.isEnabled()) {
+ return;
+ }
logger.trace("{} createQueue {}", server, queueConfiguration);
if (invalidTarget(getControllerInUse()) ||
queueConfiguration.isInternal()) {
@@ -274,6 +283,9 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
@Override
public void deleteQueue(SimpleString address, SimpleString queue) throws
Exception {
+ if (!brokerConnection.isEnabled()) {
+ return;
+ }
if (logger.isTraceEnabled()) {
logger.trace("{} deleteQueue {}/{}", server, address, queue);
}
@@ -338,6 +350,9 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
@Override
public void sendMessage(Transaction tx, Message message, RoutingContext
context) {
+ if (!brokerConnection.isEnabled()) {
+ return;
+ }
SimpleString address = context.getAddress(message);
if (context.isInternal()) {
@@ -537,6 +552,9 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
@Override
public void postAcknowledge(MessageReference ref, AckReason reason) throws
Exception {
+ if (!brokerConnection.isEnabled()) {
+ return;
+ }
if (!acks || ref.getQueue().isMirrorController()) {
postACKInternalMessage(ref);
return;
@@ -546,6 +564,9 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
@Override
public void preAcknowledge(final Transaction tx, final MessageReference
ref, final AckReason reason) throws Exception {
+ if (!brokerConnection.isEnabled()) {
+ return;
+ }
if (logger.isTraceEnabled()) {
logger.trace("preAcknowledge::tx={}, ref={}, reason={}", tx, ref,
reason);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/brokerConnectivity/BrokerConnectConfiguration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/brokerConnectivity/BrokerConnectConfiguration.java
index 38c5f9865e..ca071c9e1e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/brokerConnectivity/BrokerConnectConfiguration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/brokerConnectivity/BrokerConnectConfiguration.java
@@ -29,6 +29,7 @@ public abstract class BrokerConnectConfiguration implements
Serializable {
private static final long serialVersionUID = 8026604526022462048L;
+ private String lockCoordinator;
private String name;
private String uri;
private String user;
@@ -42,6 +43,15 @@ public abstract class BrokerConnectConfiguration implements
Serializable {
this.uri = uri;
}
+ public String getLockCoordinator() {
+ return lockCoordinator;
+ }
+
+ public BrokerConnectConfiguration setLockCoordinator(String
lockCoordinator) {
+ this.lockCoordinator = lockCoordinator;
+ return this;
+ }
+
public abstract void parseURI() throws Exception;
public int getReconnectAttempts() {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index c75e2dc1e6..3e1963b4ed 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -2219,6 +2219,7 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
String uri = e.getAttribute("uri");
+ String lockCoordinator = getAttributeValue(e, "lock-coordinator");
int retryInterval = getAttributeInteger(e, "retry-interval", 5000,
GT_ZERO);
int reconnectAttempts = getAttributeInteger(e, "reconnect-attempts", -1,
MINUS_ONE_OR_GT_ZERO);
String user = getAttributeValue(e, "user");
@@ -2235,7 +2236,7 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
AMQPBrokerConnectConfiguration config = new
AMQPBrokerConnectConfiguration(name, uri);
config.parseURI();
-
config.setRetryInterval(retryInterval).setReconnectAttempts(reconnectAttempts).setUser(user).setPassword(password).setAutostart(autoStart);
+
config.setRetryInterval(retryInterval).setReconnectAttempts(reconnectAttempts).setUser(user).setPassword(password).setAutostart(autoStart).setLockCoordinator(lockCoordinator);
mainConfig.addAMQPConnection(config);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index b473aca7c2..e4bc1ae8aa 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -468,8 +468,10 @@ public class NettyAcceptor extends AbstractAcceptor {
if (lockCoordinator == null) {
internalStart();
} else {
- lockCoordinator.onLockAcquired(this::internalStart);
- lockCoordinator.onLockReleased(this::internalStop);
+ // The Acceptor needs to start before anything else, so low priority
to start
+ lockCoordinator.onLockAcquired(this::internalStart,
LockCoordinator.LOW_PRIORITY);
+ // And the Acceptor needs to stop after everything else, so high
priority to stop
+ lockCoordinator.onLockReleased(this::internalStop,
LockCoordinator.HIGH_PRIORITY);
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java
index 7f7deb9b17..bea2faa093 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BrokerConnection.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server;
import
org.apache.activemq.artemis.core.config.brokerConnectivity.BrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
/**
* A broker connection defines a server connection created to provide services
between this server and another
@@ -32,6 +33,10 @@ public interface BrokerConnection extends ActiveMQComponent {
// Subclass should override and perform needed cleanup.
}
+ LockCoordinator getLockCoordinator();
+
+ BrokerConnection setLockCoordinator(LockCoordinator lockCoordinator);
+
/**
* {@return the unique name of the broker connection}
*/
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
index b614cb6ce8..e4f9cb7483 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/lock/LockCoordinator.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.lock;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -47,6 +48,15 @@ public class LockCoordinator extends
ActiveMQScheduledComponent {
/** Default period (in milliseconds) for checking lock status */
public static final int DEFAULT_CHECK_PERIOD = 5000;
+ /** Default priority for callbacks when not specified */
+ public static final int DEFAULT_PRIORITY = 10;
+
+ public static final int HIGH_PRIORITY = 20;
+ public static final int LOW_PRIORITY = 5;
+
+ private record PrioritizedCallback(RunnableEx runnable, int priority) {
+ }
+
String debugInfo;
public String getDebugInfo() {
@@ -60,8 +70,8 @@ public class LockCoordinator extends
ActiveMQScheduledComponent {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final ArrayList<RunnableEx> lockAcquiredCallback = new
ArrayList<>();
- private final ArrayList<RunnableEx> lockReleasedCallback = new
ArrayList<>();
+ private final ArrayList<PrioritizedCallback> lockAcquiredCallback = new
ArrayList<>();
+ private final ArrayList<PrioritizedCallback> lockReleasedCallback = new
ArrayList<>();
private final long checkPeriod;
private final String name;
private final String lockID;
@@ -78,6 +88,7 @@ public class LockCoordinator extends
ActiveMQScheduledComponent {
* Registers a callback to be executed when lock is acquired.
* If the lock is already held when this method is called, the callback
* will be executed immediately (on the executor thread).
+ * Uses the default priority ({@link #DEFAULT_PRIORITY}).
*
* Also In case the runnable throws any exceptions, the lock will be
released, any previously added callback will be called for stop
* and the monitor will retry the locks
@@ -85,7 +96,23 @@ public class LockCoordinator extends
ActiveMQScheduledComponent {
* @param runnable the callback to execute when lock is acquired
*/
public void onLockAcquired(RunnableEx runnable) {
- this.lockAcquiredCallback.add(runnable);
+ onLockAcquired(runnable, DEFAULT_PRIORITY);
+ }
+
+ /**
+ * Registers a callback to be executed when lock is acquired with a
specified priority.
+ * Callbacks are executed in ascending priority order (lowest priority
first).
+ * If the lock is already held when this method is called, the callback
+ * will be executed immediately (on the executor thread).
+ *
+ * Also In case the runnable throws any exceptions, the lock will be
released, any previously added callback will be called for stop
+ * and the monitor will retry the locks
+ *
+ * @param runnable the callback to execute when lock is acquired
+ * @param priority the priority for this callback (lower values execute
first)
+ */
+ public void onLockAcquired(RunnableEx runnable, int priority) {
+ this.lockAcquiredCallback.add(new PrioritizedCallback(runnable,
priority));
// if it's locked we run the runnable being added,
// however we must check this inside the executor
// or within a global locking
@@ -94,11 +121,23 @@ public class LockCoordinator extends
ActiveMQScheduledComponent {
/**
* Registers a callback to be executed when lock is released or lost.
+ * Uses the default priority ({@link #DEFAULT_PRIORITY}).
*
* @param runnable the callback to execute when lock is released
*/
public void onLockReleased(RunnableEx runnable) {
- this.lockReleasedCallback.add(runnable);
+ onLockReleased(runnable, DEFAULT_PRIORITY);
+ }
+
+ /**
+ * Registers a callback to be executed when lock is released or lost with a
specified priority.
+ * Callbacks are executed in ascending priority order (lowest priority
first).
+ *
+ * @param runnable the callback to execute when lock is released
+ * @param priority the priority for this callback (lower values execute
first)
+ */
+ public void onLockReleased(RunnableEx runnable, int priority) {
+ this.lockReleasedCallback.add(new PrioritizedCallback(runnable,
priority));
}
/**
@@ -175,12 +214,18 @@ public class LockCoordinator extends
ActiveMQScheduledComponent {
this.locked = locked;
if (locked) {
AtomicBoolean treatErrors = new AtomicBoolean(false);
- lockAcquiredCallback.forEach(r -> doRunTreatingErrors(r,
treatErrors));
+ // Sort callbacks by priority (lowest first) and execute them
+ lockAcquiredCallback.stream()
+ .sorted(Comparator.comparingInt(pc -> pc.priority))
+ .forEach(pc -> doRunTreatingErrors(pc.runnable, treatErrors));
if (treatErrors.get()) {
retryLock();
}
} else {
- lockReleasedCallback.forEach(this::doRunWithLogException);
+ // Sort callbacks by priority (lowest first) and execute them
+ lockReleasedCallback.stream()
+ .sorted(Comparator.comparingInt(pc -> pc.priority))
+ .forEach(pc -> doRunWithLogException(pc.runnable));
}
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 7354d8dc09..d65b21327d 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2227,6 +2227,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="lock-coordinator" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation>
+ The lock-coordinator used to pause and resume this broker
connection
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:attribute name="auto-start" type="xsd:boolean" default="true">
<xsd:annotation>
<xsd:documentation>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 7f85aaf484..6c81cd83ee 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -154,6 +154,13 @@ public class FileConfigurationParserTest extends
ServerTestBase {
</lock-coordinator>
</lock-coordinators>""";
+ private static final String BROKER_CONNECTION_PART = """
+ <broker-connections>
+ <amqp-connection uri="tcp://someHost:61000" name="someMirror"
retry-interval="2000" lock-coordinator="my-lock">
+ <mirror sync="false"/>
+ </amqp-connection>
+ </broker-connections>
+ """;
/**
* These "InvalidConfigurationTest*.xml" files are modified copies of
{@literal ConfigurationTest-full-config.xml},
@@ -451,7 +458,7 @@ public class FileConfigurationParserTest extends
ServerTestBase {
@Test
public void testLockCoordinatorParse() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();
- String configStr = FIRST_PART + LOCK_COORDINATOR_PART + LAST_PART;
+ String configStr = FIRST_PART + LOCK_COORDINATOR_PART +
BROKER_CONNECTION_PART + LAST_PART;
Configuration configuration = parser.parseMainConfig(new
ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8)));
Collection<LockCoordinatorConfiguration> lockConfigurations =
configuration.getLockCoordinatorConfigurations();
@@ -467,6 +474,8 @@ public class FileConfigurationParserTest extends
ServerTestBase {
assertEquals("value2", properties.get("test2"));
}
configuration.getAcceptorConfigurations().stream().filter(f ->
f.getName().equals("netty-with-lock")).forEach(f -> assertEquals("my-lock",
f.getLockCoordinator()));
+ assertEquals(1, configuration.getAMQPConnection().size());
+ assertEquals("my-lock",
configuration.getAMQPConnection().get(0).getLockCoordinator());
}
@Test
diff --git a/docs/user-manual/lock-coordination.adoc
b/docs/user-manual/lock-coordination.adoc
index c5ca2c7466..64ebaeda4d 100644
--- a/docs/user-manual/lock-coordination.adoc
+++ b/docs/user-manual/lock-coordination.adoc
@@ -5,38 +5,39 @@
The Lock Coordinator provides pluggable distributed lock mechanism monitoring.
It allows multiple broker instances to coordinate the activation of specific
configuration elements, ensuring that only one broker instance activates a
particular element at any given time.
+This prevents split-brain scenarios where multiple brokers could
simultaneously process messages, leading to duplicate processing or conflicting
state.
-When a broker acquires a lock through a distributed lock, the associated
configuration elements are activated.
-If the lock is lost or released, those elements are deactivated.
+In the current version, the Lock Coordinator can be applied to control the
startup and shutdown of:
-In the current version, the Lock Coordinator can be applied to control the
startup and shutdown of acceptors.
-When an acceptor is associated with a lock coordinator, it will only start
accepting connections when the broker successfully acquires the distributed
lock.
-If lock is lost for any reason, the acceptor automatically stops accepting new
connections.
+* *Acceptors* - When an acceptor is associated with a lock coordinator, it
will only start accepting connections when the broker successfully acquires the
distributed lock. If the lock is lost for any reason, the acceptor
automatically stops accepting new connections.
-The same pattern used on acceptors may eventually be applied to other
configuration elements.
+* *Broker Connections* - When a broker connection (AMQP mirror, federation, or
bridge) is associated with a lock coordinator, it will only be active when the
broker successfully acquires the distributed lock. If the lock is lost, the
connection is paused, preventing message flow. When the lock is reacquired, the
connection resumes automatically.
+
+The same pattern used on acceptors and broker connections may eventually be
applied to other configuration elements.
If you have ideas for additional use cases where this pattern could be
applied, please file a JIRA issue.
WARNING: This feature is in technical preview and its configuration elements
are subject to possible modifications.
== Configuration
-It is possible to specify multiple lock-coordinators and associate them with
other broker elements.
-
-The broker element associated with a lock-coordinator (e.g., an acceptor) will
only be started if the distributed lock can be acquired.
-If the lock cannot be acquired or is lost, the associated element will be
stopped.
+You can define multiple lock-coordinators and associate them with broker
elements such as acceptors and broker connections.
+When a broker element is associated with a lock-coordinator, it will only
activate when the distributed lock has been acquired.
+If the lock cannot be acquired or is lost, the elements are automatically
stopped or paused.
-This pattern can be used to ensure clients connect to only one of your
mirrored brokers at a time, preventing split-brain scenarios and duplicate
message processing.
+Different lock implementations (File-based, ZooKeeper-based, or custom
plugins) provide different configuration properties.
+See the <<Configuration Options>> section below for detailed tables of
available options for each lock implementation.
-Depending on the provider selector, multiple configuration options can be
provided.
-Please consult the javadoc for your lock implementation.
-A simple table will be provided in this chapter for the two reference
implementations we provide, but this could be a plugin being added to your
broker.
+=== Configuration Example
-In this next example, we configure a broker with:
+The following example demonstrates a star topology HA configuration where only
one broker is active at a time:
-* Two acceptors: one for mirroring traffic (`for-mirroring-only`) and one for
client connections (`for-clients-only`)
-* A File-based lock-coordinator named `clients-lock`
-* The client acceptor associated with the lock-coordinator, so it only
activates when the distributed lock is acquired
-* A mirror connection to another broker for data replication
+* Two acceptors:
+ - `for-mirroring-only` - Always active to receive mirrored data from other
brokers
+ - `for-clients-only` - Controlled by lock-coordinator; only active when lock
is held
+* File-based lock-coordinator named `clients-lock` using shared storage
+* Broker connections controlled by the lock-coordinator (mirrors, bridges, or
federations):
+ - Two mirror connections (`mirrorB` and `mirrorC`) for data replication
+ - One bridge connection for forwarding messages to a remote broker
[,xml]
----
@@ -58,15 +59,44 @@ In this next example, we configure a broker with:
</lock-coordinators>
<broker-connections>
- <amqp-connection uri="tcp://otherBroker:61000" name="mirror"
retry-interval="2000">
+ <amqp-connection uri="tcp://otherBroker:61001" name="mirrorB"
retry-interval="2000" lock-coordinator="clients-lock">
<mirror sync="false"/>
</amqp-connection>
+ <amqp-connection uri="tcp://otherBroker:61002" name="mirrorC"
retry-interval="2000" lock-coordinator="clients-lock">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://remoteBroker:61616" name="bridgeConnection"
retry-interval="2000" lock-coordinator="clients-lock">
+ <bridge>
+ <bridge-to-queue name="orders" remote-address="orders">
+ <include queue-match="orders" address-match="orders"/>
+ </bridge-to-queue>
+ </bridge>
+ </amqp-connection>
</broker-connections>
----
-In the previous configuration, the broker will use a file lock, and the
acceptor will only be active if it can hold the distributed lock between the
mirrored brokers.
+==== How It Works
+
+When a broker successfully acquires the distributed lock:
+
+* The client acceptor (`for-clients-only`) starts accepting connections
+* Mirror connections activate and begin replicating data to other brokers
+* Bridge connections activate and forward messages to remote brokers
+* Federation connections activate (if configured)
+
+When the lock is lost (due to failure or network partition):
+
+* The client acceptor stops accepting new connections
+* All broker connections (mirrors, bridges, and federations) pause immediately
+* The broker continues to receive mirrored data on the `for-mirroring-only`
acceptor
+
+This ensures only one broker actively processes client requests, replicates
data, and forwards messages at any given time.
+
+==== Important Considerations
+
+WARNING: When mixing bridges or federations with mirrors in the same
configuration, message duplication is possible during failover transitions.
When a message arrives at a mirror target and that node's broker connections
are active, the message may be forwarded even though it was already processed
by the original source. Lock coordination significantly reduces this risk but
cannot completely eliminate it during the brief window when locks are being
transferred between brokers.
image:images/lock-coordination-example.png[HA with mirroring]
@@ -74,9 +104,11 @@ You can find a
https://github.com/apache/artemis-examples/tree/main/examples/fea
== Configuration Options
+All lock-coordinator implementations share common configuration elements and
provide implementation-specific properties.
+
=== Common Configuration
-The following elements are configured on lock-coordinator
+The following elements are configured on every lock-coordinator regardless of
implementation:
[cols="1,1,1,3"]
|===
@@ -103,10 +135,14 @@ The following elements are configured on lock-coordinator
|How often to check if the lock is still valid, in milliseconds
|===
-=== File
+=== File-Based Lock Manager
-The file-based lock uses the file system to manage distributed locks.
-It is provided by the
class-name=`org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager`
+The file-based lock manager uses the file system to manage distributed locks
through file locking mechanisms.
+All brokers must have access to a shared file system location.
+
+**Class name:**
`org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager`
+
+**Properties:**
[cols="1,1,1,3"]
|===
@@ -118,10 +154,14 @@ It is provided by the
class-name=`org.apache.activemq.artemis.lockmanager.file.F
|Path to the directory where lock files will be created and managed. The
directory must be created in advance before using this lock.
|===
-=== ZooKeeper
+=== ZooKeeper-Based Lock Manager
+
+The ZooKeeper-based lock manager uses Apache Curator to manage distributed
locks via a ZooKeeper ensemble.
+This is the recommended approach for production deployments as it provides
better fault tolerance and doesn't require shared storage.
+
+**Class name:**
`org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager`
-The ZooKeeper-based lock uses Apache Curator to manage distributed locks via
ZooKeeper.
-It is provided by the
class-name=`org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager`
+**Properties:**
[cols="1,1,1,3"]
|===
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index 5bc25a7fa0..d6a5925dce 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -110,6 +110,7 @@ import
org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.ServiceComponent;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerLegacyProducersImpl;
+import org.apache.activemq.artemis.core.server.lock.LockCoordinator;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import
org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@@ -6453,6 +6454,16 @@ public class ActiveMQServerControlTest extends
ManagementTestBase {
started = true;
}
+ @Override
+ public LockCoordinator getLockCoordinator() {
+ return null;
+ }
+
+ @Override
+ public BrokerConnection setLockCoordinator(LockCoordinator
lockCoordinator) {
+ return null;
+ }
+
@Override
public void stop() throws Exception {
started = false;
diff --git
a/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/A/broker.xml
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/A/broker.xml
new file mode 100644
index 0000000000..bcea27b38a
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/A/broker.xml
@@ -0,0 +1,181 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <security-enabled>false</security-enabled>
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- useKQueue means: it will use Netty kqueue if you are on a system
(MacOS) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="artemis">tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ <acceptor name="forClients"
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ </acceptors>
+
+ <lock-coordinators>
+ <lock-coordinator name="failover">
+
<class-name>org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager</class-name>
+ <lock-id>fail</lock-id>
+ <properties>
+ <property key="connect-string" value="localhost:2181"/>
+ </properties>
+ </lock-coordinator>
+ </lock-coordinators>
+
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61001" name="mirrorB"
retry-interval="2000" lock-coordinator="failover">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://localhost:61002" name="mirrorC"
retry-interval="2000" lock-coordinator="failover">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://localhost:61626" name="bridgeTo"
retry-interval="2000" lock-coordinator="failover">
+ <bridge>
+ <bridge-to-queue name="bridgeQueue"
remote-address="bridgeQueue">
+ <include queue-match="bridgeQueue"
address-match="bridgeQueue"/>
+ </bridge-to-queue>
+ </bridge>
+ </amqp-connection>
+ </broker-connections>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management.#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="myQueue">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue"/>
+ </anycast>
+ </address>
+ <address name="myQueue">
+ <anycast>
+ <queue name="myQueue">
+ </queue>
+ </anycast>
+ </address>
+ <address name="bridgeQueue">
+ <anycast>
+ <queue name="bridgeQueue">
+ </queue>
+ </anycast>
+ </address>
+ </addresses>
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/B/broker.xml
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/B/broker.xml
new file mode 100644
index 0000000000..f00d1f8b54
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/B/broker.xml
@@ -0,0 +1,181 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <security-enabled>false</security-enabled>
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- useKQueue means: it will use Netty kqueue if you are on a system
(MacOS) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="artemis">tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ <acceptor name="forClients"
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ </acceptors>
+
+ <lock-coordinators>
+ <lock-coordinator name="failover">
+
<class-name>org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager</class-name>
+ <lock-id>fail</lock-id>
+ <properties>
+ <property key="connect-string" value="localhost:2181"/>
+ </properties>
+ </lock-coordinator>
+ </lock-coordinators>
+
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61000" name="mirrorA"
retry-interval="2000" lock-coordinator="failover">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://localhost:61002" name="mirrorC"
retry-interval="2000" lock-coordinator="failover">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://localhost:61626" name="bridgeTo"
retry-interval="2000" lock-coordinator="failover">
+ <bridge>
+ <bridge-to-queue name="bridgeQueue"
remote-address="bridgeQueue">
+ <include queue-match="bridgeQueue"
address-match="bridgeQueue"/>
+ </bridge-to-queue>
+ </bridge>
+ </amqp-connection>
+ </broker-connections>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management.#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="myQueue">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue"/>
+ </anycast>
+ </address>
+ <address name="myQueue">
+ <anycast>
+ <queue name="myQueue">
+ </queue>
+ </anycast>
+ </address>
+ <address name="bridgeQueue">
+ <anycast>
+ <queue name="bridgeQueue">
+ </queue>
+ </anycast>
+ </address>
+ </addresses>
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/C/broker.xml
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/C/broker.xml
new file mode 100644
index 0000000000..da8b162c9c
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/C/broker.xml
@@ -0,0 +1,181 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <security-enabled>false</security-enabled>
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- useKQueue means: it will use Netty kqueue if you are on a system
(MacOS) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="artemis">tcp://0.0.0.0:61002?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ <acceptor name="forClients"
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ </acceptors>
+
+ <lock-coordinators>
+ <lock-coordinator name="failover">
+
<class-name>org.apache.activemq.artemis.lockmanager.zookeeper.CuratorDistributedLockManager</class-name>
+ <lock-id>fail</lock-id>
+ <properties>
+ <property key="connect-string" value="localhost:2181"/>
+ </properties>
+ </lock-coordinator>
+ </lock-coordinators>
+
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61000" name="mirrorA"
retry-interval="2000" lock-coordinator="failover">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://localhost:61001" name="mirrorB"
retry-interval="2000" lock-coordinator="failover">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://localhost:61626" name="bridgeTo"
retry-interval="2000" lock-coordinator="failover">
+ <bridge>
+ <bridge-to-queue name="bridgeQueue"
remote-address="bridgeQueue">
+ <include queue-match="bridgeQueue"
address-match="bridgeQueue"/>
+ </bridge-to-queue>
+ </bridge>
+ </amqp-connection>
+ </broker-connections>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management.#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="myQueue">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue"/>
+ </anycast>
+ </address>
+ <address name="myQueue">
+ <anycast>
+ <queue name="myQueue">
+ </queue>
+ </anycast>
+ </address>
+ <address name="bridgeQueue">
+ <anycast>
+ <queue name="bridgeQueue">
+ </queue>
+ </anycast>
+ </address>
+ </addresses>
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/A/broker.xml
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/A/broker.xml
new file mode 100644
index 0000000000..99fb7c6e4d
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/A/broker.xml
@@ -0,0 +1,182 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <security-enabled>false</security-enabled>
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- useKQueue means: it will use Netty kqueue if you are on a system
(MacOS) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="artemis">tcp://0.0.0.0:61000?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ <acceptor name="forClients"
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ </acceptors>
+
+ <lock-coordinators>
+ <lock-coordinator name="failover">
+
<class-name>org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager</class-name>
+ <lock-id>fail</lock-id>
+ <check-period>5000</check-period>
+ <properties>
+ <property key="locks-folder" value="CHANGEME"/>
+ </properties>
+ </lock-coordinator>
+ </lock-coordinators>
+
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61001" name="mirrorB"
retry-interval="2000" lock-coordinator="failover">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://localhost:61002" name="mirrorC"
retry-interval="2000" lock-coordinator="failover">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://localhost:61626" name="bridgeTo"
retry-interval="2000" lock-coordinator="failover">
+ <bridge>
+ <bridge-to-queue name="bridgeQueue"
remote-address="bridgeQueue">
+ <include queue-match="bridgeQueue"
address-match="bridgeQueue"/>
+ </bridge-to-queue>
+ </bridge>
+ </amqp-connection>
+ </broker-connections>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management.#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="myQueue">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue"/>
+ </anycast>
+ </address>
+ <address name="myQueue">
+ <anycast>
+ <queue name="myQueue">
+ </queue>
+ </anycast>
+ </address>
+ <address name="bridgeQueue">
+ <anycast>
+ <queue name="bridgeQueue">
+ </queue>
+ </anycast>
+ </address>
+ </addresses>
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/B/broker.xml
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/B/broker.xml
new file mode 100644
index 0000000000..2279eab149
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/B/broker.xml
@@ -0,0 +1,182 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <security-enabled>false</security-enabled>
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- useKQueue means: it will use Netty kqueue if you are on a system
(MacOS) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="artemis">tcp://0.0.0.0:61001?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ <acceptor name="forClients"
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ </acceptors>
+
+ <lock-coordinators>
+ <lock-coordinator name="failover">
+
<class-name>org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager</class-name>
+ <lock-id>fail</lock-id>
+ <check-period>5000</check-period>
+ <properties>
+ <property key="locks-folder" value="CHANGEME"/>
+ </properties>
+ </lock-coordinator>
+ </lock-coordinators>
+
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61000" name="mirrorA"
retry-interval="2000" lock-coordinator="failover">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://localhost:61002" name="mirrorC"
retry-interval="2000" lock-coordinator="failover">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://localhost:61626" name="bridgeTo"
retry-interval="2000" lock-coordinator="failover">
+ <bridge>
+ <bridge-to-queue name="bridgeQueue"
remote-address="bridgeQueue">
+ <include queue-match="bridgeQueue"
address-match="bridgeQueue"/>
+ </bridge-to-queue>
+ </bridge>
+ </amqp-connection>
+ </broker-connections>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management.#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="myQueue">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue"/>
+ </anycast>
+ </address>
+ <address name="myQueue">
+ <anycast>
+ <queue name="myQueue">
+ </queue>
+ </anycast>
+ </address>
+ <address name="bridgeQueue">
+ <anycast>
+ <queue name="bridgeQueue">
+ </queue>
+ </anycast>
+ </address>
+ </addresses>
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/C/broker.xml
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/C/broker.xml
new file mode 100644
index 0000000000..5bad7cebee
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/C/broker.xml
@@ -0,0 +1,182 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>0.0.0.0</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO or NIO
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>./data/paging</paging-directory>
+
+ <bindings-directory>./data/bindings</bindings-directory>
+
+ <journal-directory>./data/journal</journal-directory>
+
+
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>-1</journal-pool-files>
+
+ <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+ <security-enabled>false</security-enabled>
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <acceptors>
+
+ <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
+ <!-- useKQueue means: it will use Netty kqueue if you are on a system
(MacOS) that supports it -->
+ <!-- amqpCredits: The number of credits sent to AMQP producers -->
+ <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
+
+ <!-- Acceptor for every supported protocol -->
+ <acceptor
name="artemis">tcp://0.0.0.0:61002?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ <acceptor name="forClients"
lock-coordinator="failover">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>
+ </acceptors>
+
+ <lock-coordinators>
+ <lock-coordinator name="failover">
+
<class-name>org.apache.activemq.artemis.lockmanager.file.FileBasedLockManager</class-name>
+ <lock-id>fail</lock-id>
+ <check-period>5000</check-period>
+ <properties>
+ <property key="locks-folder" value="CHANGEME"/>
+ </properties>
+ </lock-coordinator>
+ </lock-coordinators>
+
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61000" name="mirrorA"
retry-interval="2000" lock-coordinator="failover">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://localhost:61001" name="mirrorB"
retry-interval="2000" lock-coordinator="failover">
+ <mirror sync="false"/>
+ </amqp-connection>
+ <amqp-connection uri="tcp://localhost:61626" name="bridgeTo"
retry-interval="2000" lock-coordinator="failover">
+ <bridge>
+ <bridge-to-queue name="bridgeQueue"
remote-address="bridgeQueue">
+ <include queue-match="bridgeQueue"
address-match="bridgeQueue"/>
+ </bridge-to-queue>
+ </bridge>
+ </amqp-connection>
+ </broker-connections>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createAddress" roles="guest"/>
+ <permission type="deleteAddress" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="browse" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management.#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ <address-setting match="myQueue">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+ <default-max-consumers>1</default-max-consumers>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ"/>
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue"/>
+ </anycast>
+ </address>
+ <address name="myQueue">
+ <anycast>
+ <queue name="myQueue">
+ </queue>
+ </anycast>
+ </address>
+ <address name="bridgeQueue">
+ <anycast>
+ <queue name="bridgeQueue">
+ </queue>
+ </anycast>
+ </address>
+ </addresses>
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
index 2af144d8fb..426f4e503d 100644
---
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/LockCoordinatorTest.java
@@ -106,6 +106,7 @@ public class LockCoordinatorTest extends ActiveMQTestBase {
testAddAfterLocked(lockCoordinatorSupplier.apply(1).get(0));
testRetryAfterError(lockCoordinatorSupplier.apply(1).get(0));
testRetryAfterErrorWithDelayAdd(lockCoordinatorSupplier.apply(1).get(0));
+ testPriorityOrdering(lockCoordinatorSupplier.apply(1).get(0));
{
List<LockCoordinator> list = lockCoordinatorSupplier.apply(2);
@@ -179,6 +180,41 @@ public class LockCoordinatorTest extends ActiveMQTestBase {
}
}
+ private void testPriorityOrdering(LockCoordinator lockCoordinator) throws
Exception {
+ lockHolderCount.set(0);
+ lockChanged.set(0);
+
+ try {
+ // Add callbacks with different priorities (lower values execute
first)
+ List<Integer> executionOrder = new ArrayList<>();
+ lockCoordinator.onLockAcquired(() -> executionOrder.add(3), 30);
+ lockCoordinator.onLockAcquired(() -> executionOrder.add(1), 10);
+ lockCoordinator.onLockAcquired(() -> executionOrder.add(2), 20);
+ lockCoordinator.onLockAcquired(() -> executionOrder.add(4), 40);
+
+ List<Integer> releaseOrder = new ArrayList<>();
+ lockCoordinator.onLockReleased(() -> releaseOrder.add(2), 20);
+ lockCoordinator.onLockReleased(() -> releaseOrder.add(4), 40);
+ lockCoordinator.onLockReleased(() -> releaseOrder.add(1), 10);
+ lockCoordinator.onLockReleased(() -> releaseOrder.add(3), 30);
+
+ lockCoordinator.start();
+ Wait.assertEquals(1, () -> lockHolderCount.get(), 15000, 100);
+
+ // Verify acquired callbacks executed in priority order (lowest first)
+ Wait.assertEquals(4, executionOrder::size, 5000, 100);
+ assertEquals(List.of(1, 2, 3, 4), executionOrder);
+
+ lockCoordinator.stop();
+
+ // Verify released callbacks executed in priority order (lowest first)
+ Wait.assertEquals(4, releaseOrder::size, 5000, 100);
+ assertEquals(List.of(1, 2, 3, 4), releaseOrder);
+ } finally {
+ lockCoordinator.stop();
+ }
+ }
+
// validate that no retry would happen since the lock wasn't held in the
secondLock
private void testNoRetryWhileNotAcquired(LockCoordinator firstLock,
LockCoordinator secondLock) throws Exception {
lockHolderCount.set(0);
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/StarMirrorSingleAcceptorRunningTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/StarMirrorSingleAcceptorRunningTest.java
new file mode 100644
index 0000000000..5078687005
--- /dev/null
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/lockmanager/StarMirrorSingleAcceptorRunningTest.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.artemis.tests.smoke.lockmanager;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.function.Consumer;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class StarMirrorSingleAcceptorRunningTest extends SmokeTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public static final String SERVER_NAME_WITH_ZK_A =
"lockmanager/starMirrorSingleAcceptor/ZK/A";
+ public static final String SERVER_NAME_WITH_ZK_B =
"lockmanager/starMirrorSingleAcceptor/ZK/B";
+ public static final String SERVER_NAME_WITH_ZK_C =
"lockmanager/starMirrorSingleAcceptor/ZK/C";
+
+ public static final String SERVER_NAME_WITH_FILE_A =
"lockmanager/starMirrorSingleAcceptor/file/A";
+ public static final String SERVER_NAME_WITH_FILE_B =
"lockmanager/starMirrorSingleAcceptor/file/B";
+ public static final String SERVER_NAME_WITH_FILE_C =
"lockmanager/starMirrorSingleAcceptor/file/C";
+
+
+ public static final String SERVER_NAME_BRIDGE_TARGET =
"lockmanager/starMirrorSingleAcceptor/bridgeTarget";
+
+ // Test constants
+ private static final int ALTERNATING_TEST_ITERATIONS = 3;
+ private static final int MESSAGES_SENT_PER_ITERATION = 100;
+ private static final int BRIDGE_MESSAGES_SENT_PER_ITERATION = 7;
+ private static final int MESSAGES_CONSUMED_PER_ITERATION = 17;
+ private static final int MESSAGES_REMAINING_PER_ITERATION =
MESSAGES_SENT_PER_ITERATION - MESSAGES_CONSUMED_PER_ITERATION;
+ private static final int EXPECTED_FINAL_MESSAGE_COUNT =
ALTERNATING_TEST_ITERATIONS * MESSAGES_REMAINING_PER_ITERATION;
+
+
+ private static final int ZK_BASE_PORT = 2181;
+
+ Process processA;
+ Process processB;
+ Process processC;
+ Process bridgeTargetProcess;
+
+ private static void customizeFileServer(File serverLocation, File fileLock)
{
+ try {
+ FileUtil.findReplace(new File(serverLocation, "/etc/broker.xml"),
"CHANGEME", fileLock.getAbsolutePath());
+ } catch (Throwable e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ private static void createServers(String serverNameA, String serverNameB,
String serverNameC,
+ String configPathA, String
configPathB, String configPathC,
+ Consumer<File> customizeServer) throws
Exception {
+ File serverLocationA = getFileServerLocation(serverNameA);
+ File serverLocationB = getFileServerLocation(serverNameB);
+ File serverLocationC = getFileServerLocation(serverNameC);
+ deleteDirectory(serverLocationA);
+ deleteDirectory(serverLocationB);
+ deleteDirectory(serverLocationC);
+
+ createSingleServer(serverLocationA, configPathA, customizeServer);
+ createSingleServer(serverLocationB, configPathB, customizeServer);
+ createSingleServer(serverLocationC, configPathC, customizeServer);
+
+ File bridgeTarget = getFileServerLocation(SERVER_NAME_BRIDGE_TARGET);
+ createBridgeTarget(bridgeTarget);
+ }
+
+ private static void createSingleServer(File serverLocation, String
configPath,
+ Consumer<File> customizeServer)
throws Exception {
+ HelperCreate cliCreateServer = helperCreate();
+ cliCreateServer.setAllowAnonymous(true)
+ .setUser("admin")
+ .setPassword("adming")
+ .setNoWeb(true)
+ .setConfiguration(configPath)
+ .setArtemisInstance(serverLocation);
+ cliCreateServer.createServer();
+
+ if (customizeServer != null) {
+ customizeServer.accept(serverLocation);
+ }
+ }
+
+
+ private static void createBridgeTarget(File serverLocation) throws
Exception {
+ HelperCreate cliCreateServer = helperCreate();
+ cliCreateServer.setAllowAnonymous(true)
+ .setUser("admin")
+ .setPassword("admin")
+ .setNoWeb(true)
+ .setPortOffset(10)
+ .setArtemisInstance(serverLocation);
+ cliCreateServer.addArgs("--queues", "bridgeQueue");
+ cliCreateServer.createServer();
+ }
+
+
+
+ @BeforeEach
+ public void prepareServers() throws Exception {
+
+ }
+
+ @Test
+ public void testAlternatingZK() throws Throwable {
+ {
+ createServers(SERVER_NAME_WITH_ZK_A, SERVER_NAME_WITH_ZK_B,
SERVER_NAME_WITH_ZK_C,
+
"./src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/A",
+
"./src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/B",
+
"./src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/ZK/C",
+ null);
+
+ cleanupData(SERVER_NAME_WITH_ZK_A);
+ cleanupData(SERVER_NAME_WITH_ZK_B);
+ cleanupData(SERVER_NAME_WITH_ZK_C);
+ cleanupData(SERVER_NAME_BRIDGE_TARGET);
+ }
+
+ // starting zookeeper
+ ZookeeperCluster zkCluster = new ZookeeperCluster(temporaryFolder, 1,
ZK_BASE_PORT, 100);
+ zkCluster.start();
+ runAfter(zkCluster::stop);
+
+ testAlternating(SERVER_NAME_WITH_ZK_A, SERVER_NAME_WITH_ZK_B,
SERVER_NAME_WITH_ZK_C);
+ }
+
+ @Test
+ public void testAlternatingFile() throws Throwable {
+ File fileLock = new File("./target/serverLock");
+ fileLock.mkdirs();
+
+ {
+ createServers(SERVER_NAME_WITH_FILE_A, SERVER_NAME_WITH_FILE_B,
SERVER_NAME_WITH_FILE_C,
+
"./src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/A",
+
"./src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/B",
+
"./src/main/resources/servers/lockmanager/starMirrorSingleAcceptor/file/C",
+ s -> customizeFileServer(s, fileLock));
+
+ cleanupData(SERVER_NAME_WITH_FILE_A);
+ cleanupData(SERVER_NAME_WITH_FILE_B);
+ cleanupData(SERVER_NAME_WITH_FILE_C);
+ cleanupData(SERVER_NAME_BRIDGE_TARGET);
+ }
+
+ testAlternating(SERVER_NAME_WITH_FILE_A, SERVER_NAME_WITH_FILE_B,
SERVER_NAME_WITH_FILE_C);
+ }
+
+ public void testAlternating(String nameServerA, String nameServerB, String
nameServerC) throws Throwable {
+ processA = startServer(nameServerA, 0, 60_000);
+ processB = startServer(nameServerB, 0, -1);
+ processC = startServer(nameServerC, 0, -1);
+ bridgeTargetProcess = startServer(SERVER_NAME_BRIDGE_TARGET, 0, -1);
+
+ ConnectionFactory cfX = CFUtil.createConnectionFactory("amqp",
"tcp://localhost:61616");
+
+ final String queueName = "myQueue";
+ final String bridgeQueue = "bridgeQueue";
+
+ validateStar(cfX);
+
+ for (int i = 0; i < ALTERNATING_TEST_ITERATIONS; i++) {
+ String activeServer = (i % 3 == 0) ? "A" : (i % 3 == 1) ? "B" : "C";
+ logger.info("Iteration {}: Server {} active", i, activeServer);
+
+ if (i % 3 == 0) {
+ // Iteration 0, 3, 6, ...: Server A active, kill Server B and C
+ killServer(processB);
+ killServer(processC);
+ } else if (i % 3 == 1) {
+ // Iteration 1, 4, 7, ...: Server B active, kill Server A and C
+ killServer(processA);
+ killServer(processC);
+ } else {
+ // Iteration 2, 5, 8, ...: Server C active, kill Server A and B
+ killServer(processA);
+ killServer(processB);
+ }
+
+ // Send messages through the shared acceptor
+ sendMessages(cfX, queueName, MESSAGES_SENT_PER_ITERATION);
+
+ // Consume some messages
+ receiveMessages(cfX, queueName, MESSAGES_CONSUMED_PER_ITERATION);
+
+ // Restart the killed servers
+ if (i % 3 == 0) {
+ processB = startServer(nameServerB, 0, -1);
+ processC = startServer(nameServerC, 0, -1);
+ } else if (i % 3 == 1) {
+ processA = startServer(nameServerA, 0, -1);
+ processC = startServer(nameServerC, 0, -1);
+ } else {
+ processA = startServer(nameServerA, 0, -1);
+ processB = startServer(nameServerB, 0, -1);
+ }
+
+ assertEmptySNFs();
+
+ assertMessageCount("tcp://localhost:61000", queueName,
MESSAGES_REMAINING_PER_ITERATION * (i + 1));
+ assertMessageCount("tcp://localhost:61001", queueName,
MESSAGES_REMAINING_PER_ITERATION * (i + 1));
+ assertMessageCount("tcp://localhost:61002", queueName,
MESSAGES_REMAINING_PER_ITERATION * (i + 1));
+
+ sendMessages(cfX, bridgeQueue, BRIDGE_MESSAGES_SENT_PER_ITERATION);
+ assertMessageCount("tcp://localhost:61626", bridgeQueue,
BRIDGE_MESSAGES_SENT_PER_ITERATION * (i + 1));
+ assertMessageCount("tcp://localhost:61000", bridgeQueue, 0);
+ assertMessageCount("tcp://localhost:61001", bridgeQueue, 0);
+ assertMessageCount("tcp://localhost:61002", bridgeQueue, 0);
+ }
+
+ // Verify they all have the expected message count (iterations × (sent -
consumed))
+ assertMessageCount("tcp://localhost:61000", queueName,
EXPECTED_FINAL_MESSAGE_COUNT);
+ assertMessageCount("tcp://localhost:61001", queueName,
EXPECTED_FINAL_MESSAGE_COUNT);
+ assertMessageCount("tcp://localhost:61002", queueName,
EXPECTED_FINAL_MESSAGE_COUNT);
+
+ assertMessageCount("tcp://localhost:61000", bridgeQueue, 0);
+ assertMessageCount("tcp://localhost:61001", bridgeQueue, 0);
+ assertMessageCount("tcp://localhost:61002", bridgeQueue, 0);
+
+ assertMessageCount("tcp://localhost:61626", bridgeQueue,
BRIDGE_MESSAGES_SENT_PER_ITERATION * ALTERNATING_TEST_ITERATIONS);
+
+ receiveMessages(cfX, queueName, EXPECTED_FINAL_MESSAGE_COUNT);
+ assertMessageCount("tcp://localhost:61000", queueName, 0);
+ assertMessageCount("tcp://localhost:61001", queueName, 0);
+ assertMessageCount("tcp://localhost:61002", queueName, 0);
+ assertEmptySNFs();
+
+ validateStar(cfX);
+ }
+
+ private void assertEmptySNFs() throws Exception {
+ assertMessageCount("tcp://localhost:61000",
"$ACTIVEMQ_ARTEMIS_MIRROR_mirrorB", 0);
+ assertMessageCount("tcp://localhost:61000",
"$ACTIVEMQ_ARTEMIS_MIRROR_mirrorC", 0);
+
+ assertMessageCount("tcp://localhost:61001",
"$ACTIVEMQ_ARTEMIS_MIRROR_mirrorA", 0);
+ assertMessageCount("tcp://localhost:61001",
"$ACTIVEMQ_ARTEMIS_MIRROR_mirrorC", 0);
+
+ assertMessageCount("tcp://localhost:61002",
"$ACTIVEMQ_ARTEMIS_MIRROR_mirrorA", 0);
+ assertMessageCount("tcp://localhost:61002",
"$ACTIVEMQ_ARTEMIS_MIRROR_mirrorB", 0);
+ }
+
+ private void validateStar(ConnectionFactory cfX) throws Exception {
+ // validate the star combination
+ sendMessages(cfX, "myQueue", 1_000);
+ assertMessageCount("tcp://localhost:61000", "myQueue", 1_000);
+ assertMessageCount("tcp://localhost:61001", "myQueue", 1_000);
+ assertMessageCount("tcp://localhost:61002", "myQueue", 1_000);
+
+ // validate the star combination
+ receiveMessages(cfX, "myQueue", 1_000);
+ assertMessageCount("tcp://localhost:61000", "myQueue", 0);
+ assertMessageCount("tcp://localhost:61001", "myQueue", 0);
+ assertMessageCount("tcp://localhost:61002", "myQueue", 0);
+ }
+
+ private static void sendMessages(ConnectionFactory cfX, String queueName,
int nmessages) throws JMSException {
+ try (Connection connectionX = retryUntilIsLive(cfX)) {
+ Session sessionX = connectionX.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = sessionX.createQueue(queueName);
+ MessageProducer producerX = sessionX.createProducer(queue);
+ for (int i = 0; i < nmessages; i++) {
+ producerX.send(sessionX.createTextMessage("hello " + i));
+ }
+ sessionX.commit();
+ }
+ }
+
+ private static Connection retryUntilIsLive(ConnectionFactory cfX) {
+ final int maxRetry = 1000;
+ for (int i = 0; i < maxRetry; i++) {
+ try {
+ return cfX.createConnection();
+ } catch (Exception ex) {
+ logger.info("Exception during connection, retrying the
connection... {} out of {} retries, message = {}", i, maxRetry,
ex.getMessage());
+ try {
+ Thread.sleep(500);
+ } catch (Throwable e) {
+ }
+ }
+ }
+ fail("Could not connect after " + maxRetry + " retries");
+ return null; // never happening, fail will throw an exception
+ }
+
+ private static void receiveMessages(ConnectionFactory cfX, String
queueName, int nmessages) throws JMSException {
+ try (Connection connectionX = retryUntilIsLive(cfX)) {
+ connectionX.start();
+ Session sessionX = connectionX.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue queue = sessionX.createQueue(queueName);
+ MessageConsumer consumerX = sessionX.createConsumer(queue);
+ for (int i = 0; i < nmessages; i++) {
+ TextMessage message = (TextMessage) consumerX.receive(5000);
+ assertNotNull(message, "Expected message " + i + " but got null");
+ }
+ sessionX.commit();
+ }
+ }
+
+ protected void assertMessageCount(String uri, String queueName, long count)
throws Exception {
+ SimpleManagement simpleManagement = new SimpleManagement(uri, null,
null);
+ Wait.assertEquals(count, () -> {
+ try {
+ long result = simpleManagement.getMessageCountOnQueue(queueName);
+ if (count != result) {
+ logger.info("validating {} on queue {} expecting count = {},
result = {}", uri, queueName, count, result);
+ }
+ return result;
+ } catch (Throwable e) {
+ return -1;
+ }
+ });
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]