This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new c523458a9a ARTEMIS-4758 Hardening Mirroring
c523458a9a is described below
commit c523458a9aa4f67ad0e9bdbc5c4733bc88bf55f6
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu May 2 10:28:42 2024 -0400
ARTEMIS-4758 Hardening Mirroring
This is a list of improvements done as part of this commit / task:
* Page Transactions on mirror target are now optional.
If you had an interrupt mirror while the target destination was paging,
duplicate detection would be ineffective unless you used paged transactions
Users can now configure the ack manager retries intervals.
Say you need some time to remove a consumer from a target mirror. The
delivering references would prevent acks from happening. You can allow bigger
retry intervals and number of retries by tinkiering with ack manager retry
parameters.
* AckManager restarted independent of incoming acks
The ackManager was only restarted when new acks were coming in. If you
stopped receiving acks on a target server and restarted that server with
pending acks, those acks would never be exercised. The AckManager is now
restarted as soon as the server is started.
---
.../core/server/ActiveMQScheduledComponent.java | 15 +-
.../utils/ActiveMQScheduledComponentTest.java | 46 +++
.../api/config/ActiveMQDefaultConfiguration.java | 32 +++
.../core/journal/collections/JournalHashMap.java | 5 +
.../amqp/broker/ProtonProtocolManagerFactory.java | 3 +-
.../connect/mirror/AMQPMirrorControllerSource.java | 3 +
.../connect/mirror/AMQPMirrorControllerTarget.java | 12 +-
.../protocol/amqp/connect/mirror/AckManager.java | 80 +++---
.../amqp/connect/mirror/AckManagerProvider.java | 8 +-
.../amqp/connect/mirror/MirrorTransaction.java | 11 +
.../artemis/core/config/Configuration.java | 26 ++
.../core/config/impl/ConfigurationImpl.java | 58 ++++
.../deployers/impl/FileConfigurationParser.java | 16 ++
.../artemis/core/paging/impl/PagingStoreImpl.java | 5 +-
.../persistence/impl/journal/codec/AckRetry.java | 14 +-
.../artemis/core/transaction/Transaction.java | 4 +
.../resources/schema/artemis-configuration.xsd | 42 +++
.../core/config/impl/FileConfigurationTest.java | 5 +
.../resources/ConfigurationTest-full-config.xml | 6 +
.../ConfigurationTest-xinclude-config.xml | 6 +
.../ConfigurationTest-xinclude-schema-config.xml | 7 +
.../integration/amqp/connect/AckManagerTest.java | 16 +-
.../mirror/SingleMirrorSoakTest.java | 311 +++++++++++++++++++++
tests/soak-tests/src/test/scripts/parameters.sh | 9 +-
24 files changed, 670 insertions(+), 70 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
index 9918c1f405..3fe9b75812 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQScheduledComponent.java
@@ -212,15 +212,20 @@ public abstract class ActiveMQScheduledComponent
implements ActiveMQComponent, R
}
public synchronized ActiveMQScheduledComponent setPeriod(long period) {
- this.period = period;
- restartIfNeeded();
+ if (this.period != period) {
+ this.period = period;
+ restartIfNeeded();
+ }
return this;
}
public synchronized ActiveMQScheduledComponent setPeriod(long period,
TimeUnit unit) {
- this.period = period;
- this.timeUnit = unit;
- restartIfNeeded();
+ if (unit == null) throw new NullPointerException("unit is required");
+ if (this.period != period || this.timeUnit != unit) {
+ this.period = period;
+ this.timeUnit = unit;
+ restartIfNeeded();
+ }
return this;
}
diff --git
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
index a80137da92..0f0417f01b 100644
---
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
+++
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ActiveMQScheduledComponentTest.java
@@ -201,6 +201,52 @@ public class ActiveMQScheduledComponentTest {
}
}
+
+ @Test
+ public void testUpdatePeriod() throws Throwable {
+ final ReusableLatch latch = new ReusableLatch(1);
+
+ final ActiveMQScheduledComponent local = new
ActiveMQScheduledComponent(10, TimeUnit.MILLISECONDS, true) {
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ };
+
+ local.start();
+
+ try {
+ Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS));
+
+ latch.setCount(1);
+ local.delay();
+ Assert.assertTrue(latch.await(20, TimeUnit.MILLISECONDS));
+
+ latch.setCount(1);
+ Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS));
+
+ local.setPeriod(TimeUnit.HOURS.toMillis(1), TimeUnit.MILLISECONDS);
+
+ latch.setCount(1);
+ local.delay();
+ Assert.assertFalse(latch.await(20, TimeUnit.MILLISECONDS));
+
+ local.setPeriod(1);
+ local.delay();
+ Assert.assertTrue(latch.await(20, TimeUnit.MILLISECONDS));
+
+ local.setPeriod(1, TimeUnit.SECONDS);
+
+ latch.setCount(1);
+ local.delay();
+
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ } finally {
+ local.stop();
+ local.stop(); // calling stop again should not be an issue.
+ }
+ }
+
@Test
public void testUsingCustomInitialDelay() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 48fb2b6b2a..0c84a13336 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -698,6 +698,17 @@ public final class ActiveMQDefaultConfiguration {
private static final boolean DEFAULT_MANAGEMENT_MESSAGE_RBAC = false;
+
+ // These properties used to defined with this prefix.
+ // I'm keeping the older property name in an attempt to guarantee
compatibility
+ private static final String FORMER_ACK_RETRY_CLASS_NAME =
"org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckRetry";
+ private static final int DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS =
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME +
".MIN_QUEUE_ATTEMPTS", "5"));;
+ private static final int DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS =
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME +
".MAX_PAGE_ATTEMPT", "2"));;
+
+ private static final int DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY =
Integer.parseInt(System.getProperty(FORMER_ACK_RETRY_CLASS_NAME +
".RETRY_DELAY", "100"));;
+
+ private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;
+
/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol
Managers that are in available on the classpath. If false then only the core
protocol will be available, unless in Embedded mode where users can inject
their own Protocol Managers.
*/
@@ -1918,4 +1929,25 @@ public final class ActiveMQDefaultConfiguration {
public static boolean getManagementMessagesRbac() {
return DEFAULT_MANAGEMENT_MESSAGE_RBAC;
}
+
+
+ /** This configures the Mirror Ack Manager number of attempts on queues
before trying page acks.
+ * It is not intended to be configured through the XML.
+ * The default value here is 5. */
+ public static int getMirrorAckManagerMinQueueAttempts() {
+ return DEFAULT_MIRROR_ACK_MANAGER_MIN_QUEUE_ATTEMPTS;
+ }
+
+ public static int getMirrorAckManagerMaxPageAttempts() {
+ return DEFAULT_MIRROR_ACK_MANAGER_MAX_PAGE_ATTEMPTS;
+ }
+
+ public static int getMirrorAckManagerRetryDelay() {
+ return DEFAULT_MIRROR_ACK_MANAGER_RETRY_DELAY;
+ }
+
+ public static boolean getDefaultMirrorPageTransaction() {
+ return DEFAULT_MIRROR_PAGE_TRANSACTION;
+ }
+
}
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java
index 4cbd17383a..f0d70340dd 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/collections/JournalHashMap.java
@@ -37,6 +37,11 @@ import
org.apache.activemq.artemis.core.persistence.Persister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * K = Key
+ * V = Value
+ * C = Context
+ * */
public class JournalHashMap<K, V, C> implements Map<K, V> {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
index 21c5d2f444..378c64d098 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManagerFactory.java
@@ -87,7 +87,8 @@ public class ProtonProtocolManagerFactory extends
AbstractProtocolManagerFactory
@Override
public void loadProtocolServices(ActiveMQServer server,
List<ActiveMQComponent> services) {
try {
- AckManager ackManager = AckManagerProvider.getManager(server, false);
+ AckManager ackManager = AckManagerProvider.getManager(server);
+ services.add(ackManager);
server.registerRecordsLoader(ackManager::reload);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 1b22567ad4..abee8a6220 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -419,6 +419,8 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
+
+ snfQueue.deliverAsync();
}
private void syncDone(MessageReference reference) {
@@ -516,6 +518,7 @@ public class AMQPMirrorControllerSource extends
BasicMirrorController<Sender> im
postACKInternalMessage(ref);
return;
}
+ snfQueue.deliverAsync();
}
@Override
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index d55d6ed71a..4028f77262 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -23,6 +23,7 @@ import
org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
@@ -161,6 +162,8 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
final ActiveMQServer server;
+ final Configuration configuration;
+
DuplicateIDCache lruduplicateIDCache;
String lruDuplicateIDKey;
@@ -183,6 +186,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
this.basicController = new BasicMirrorController(server);
this.basicController.setLink(receiver);
this.server = server;
+ this.configuration = server.getConfiguration();
this.referenceNodeStore =
sessionSPI.getProtocolManager().getReferenceIDSupplier();
mirrorContext = protonSession.getSessionSPI().getSessionContext();
}
@@ -389,8 +393,8 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
}
if (logger.isTraceEnabled()) {
- logger.trace("Server {} with queue = {} being acked for {} coming
from {} targetQueue = {}",
- server.getIdentity(), queue, messageID, messageID,
targetQueue);
+ logger.trace("Server {} with queue = {} being acked for {} from {}
targetQueue = {} reason = {}",
+ server.getIdentity(), queue, messageID, ackMessage,
targetQueue, reason);
}
performAck(nodeID, targetQueue, messageID, ackMessage, reason);
@@ -407,7 +411,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
}
if (ackManager == null) {
- ackManager = AckManagerProvider.getManager(server, true);
+ ackManager = AckManagerProvider.getManager(server);
}
ackManager.ack(nodeID, targetQueue, messageID, reason, true);
@@ -473,7 +477,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
message.setAddress(internalAddress);
}
- final TransactionImpl transaction = new
MirrorTransaction(server.getStorageManager()).setAsync(true);
+ final TransactionImpl transaction = new
MirrorTransaction(server.getStorageManager()).setAllowPageTransaction(configuration.isMirrorPageTransaction()).setAsync(true);
transaction.addOperation(messageCompletionAck.tx);
routingContext.setTransaction(transaction);
duplicateIDCache.addToCache(duplicateIDBytes, transaction);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
index 0b3b395d12..51f88edad8 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManager.java
@@ -29,6 +29,7 @@ import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.RecordInfo;
@@ -58,12 +59,6 @@ import org.slf4j.LoggerFactory;
public class AckManager implements ActiveMQComponent {
// we first retry on the queue a few times
- private static final short MIN_QUEUE_ATTEMPTS =
Short.parseShort(System.getProperty(AckRetry.class.getName() +
".MIN_QUEUE_ATTEMPTS", "5"));
-
- private static final short MAX_PAGE_ATTEMPTS =
Short.parseShort(System.getProperty(AckRetry.class.getName() +
".MAX_PAGE_ATTEMPT", "2"));
-
- public static final int RETRY_DELAY =
Integer.parseInt(System.getProperty(AckRetry.class.getName() + ".RETRY_DELAY",
"100"));
-
private static DisabledAckMirrorController disabledAckMirrorController =
new DisabledAckMirrorController();
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -72,6 +67,7 @@ public class AckManager implements ActiveMQComponent {
final LongSupplier sequenceGenerator;
final JournalHashMapProvider<AckRetry, AckRetry, Queue>
journalHashMapProvider;
final ActiveMQServer server;
+ final Configuration configuration;
final ReferenceIDSupplier referenceIDSupplier;
final IOCriticalErrorListener ioCriticalErrorListener;
volatile MultiStepProgress progress;
@@ -79,6 +75,7 @@ public class AckManager implements ActiveMQComponent {
public AckManager(ActiveMQServer server) {
this.server = server;
+ this.configuration = server.getConfiguration();
this.ioCriticalErrorListener = server.getIoCriticalErrorListener();
this.journal = server.getStorageManager().getMessageJournal();
this.sequenceGenerator = server.getStorageManager()::generateID;
@@ -107,9 +104,11 @@ public class AckManager implements ActiveMQComponent {
@Override
public synchronized void start() {
- logger.debug("Starting ACKManager on {} with period = {}", server,
RETRY_DELAY);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Starting ACKManager on {} with period = {},
minQueueAttempts={}, maxPageAttempts={}", server,
configuration.getMirrorAckManagerRetryDelay(),
configuration.getMirrorAckManagerQueueAttempts(),
configuration.getMirrorAckManagerPageAttempts());
+ }
if (!isStarted()) {
- scheduledComponent = new
ActiveMQScheduledComponent(server.getScheduledPool(),
server.getExecutorFactory().getExecutor(), RETRY_DELAY, RETRY_DELAY,
TimeUnit.MILLISECONDS, true) {
+ scheduledComponent = new
ActiveMQScheduledComponent(server.getScheduledPool(),
server.getExecutorFactory().getExecutor(),
server.getConfiguration().getMirrorAckManagerRetryDelay(),
server.getConfiguration().getMirrorAckManagerRetryDelay(),
TimeUnit.MILLISECONDS, true) {
@Override
public void run() {
beginRetry();
@@ -203,20 +202,20 @@ public class AckManager implements ActiveMQComponent {
// to be used with the same executor as the PagingStore executor
- public boolean retryAddress(SimpleString address,
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> queuesToRetry) {
+ public boolean retryAddress(SimpleString address,
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>> acksToRetry) {
MirrorController previousController =
AMQPMirrorControllerTarget.getControllerInUse();
boolean retriedPaging = false;
- logger.debug("retrying address {} on server {}", address, server);
+ logger.trace("retrying address {} on server {}", address, server);
try {
AMQPMirrorControllerTarget.setControllerInUse(disabledAckMirrorController);
- if (checkRetriesAndPaging(queuesToRetry)) {
- logger.debug("scanning paging for {}", address);
+ if (checkRetriesAndPaging(acksToRetry)) {
+ logger.trace("scanning paging for {}", address);
AckRetry key = new AckRetry();
PagingStore store =
server.getPagingManager().getPageStore(address);
for (long pageId = store.getFirstPage(); pageId <=
store.getCurrentWritingPage(); pageId++) {
- if (isEmpty(queuesToRetry)) {
+ if (isEmpty(acksToRetry)) {
logger.trace("Retry stopped while reading page {} on address
{} as the outcome is now empty, server={}", pageId, address, server);
break;
}
@@ -225,16 +224,16 @@ public class AckManager implements ActiveMQComponent {
continue;
}
try {
- if (retryPage(queuesToRetry, page, key)) {
+ if (retryPage(acksToRetry, page, key)) {
retriedPaging = true;
}
} finally {
page.usageDown();
}
}
- validateExpiredSet(queuesToRetry);
+ validateExpiredSet(acksToRetry);
} else {
- logger.debug("Page Scan not required for address {}", address);
+ logger.trace("Page Scan not required for address {}", address);
}
} catch (Throwable e) {
@@ -251,15 +250,15 @@ public class AckManager implements ActiveMQComponent {
private void validateExpireSet(long queueID, JournalHashMap<AckRetry,
AckRetry, Queue> retries) {
for (AckRetry retry : retries.valuesCopy()) {
- if (retry.getQueueAttempts() >= MIN_QUEUE_ATTEMPTS) {
- if (retry.attemptedPage() >= MAX_PAGE_ATTEMPTS) {
+ if (retry.getQueueAttempts() >=
configuration.getMirrorAckManagerQueueAttempts()) {
+ if (retry.attemptedPage() >=
configuration.getMirrorAckManagerPageAttempts()) {
if (logger.isDebugEnabled()) {
logger.debug("Retried {} {} times, giving up on the entry
now", retry, retry.getPageAttempts());
}
retries.remove(retry);
} else {
if (logger.isDebugEnabled()) {
- logger.debug("Retry {} attempted {} times on paging", retry,
retry.getPageAttempts());
+ logger.trace("Retry {} attempted {} times on paging", retry,
retry.getPageAttempts());
}
}
}
@@ -283,14 +282,14 @@ public class AckManager implements ActiveMQComponent {
}
long id = referenceIDSupplier.getID(pagedMessage.getMessage());
- logger.debug("Looking for retry on serverID={}, id={} on
server={}", serverID, id, server);
+ logger.trace("Looking for retry on serverID={}, id={} on
server={}", serverID, id, server);
key.setNodeID(serverID).setMessageID(id);
- AckRetry foundRetry = retries.get(key);
+ AckRetry ackRetry = retries.get(key);
// we first retry messages in the queue first.
// this is to avoid messages that are in transit from being
depaged into the queue
- if (foundRetry != null && foundRetry.getQueueAttempts() >
MIN_QUEUE_ATTEMPTS) {
+ if (ackRetry != null && ackRetry.getQueueAttempts() >
configuration.getMirrorAckManagerQueueAttempts()) {
Queue queue = retries.getContext();
if (queue != null) {
@@ -307,9 +306,9 @@ public class AckManager implements ActiveMQComponent {
}
}
}
- retries.remove(foundRetry, transaction.getID());
+ retries.remove(ackRetry, transaction.getID());
transaction.setContainsPersistent();
- logger.debug("retry found = {} for message={} on queue",
foundRetry, pagedMessage);
+ logger.trace("retry performed ok, ackRetry={} for
message={} on queue", ackRetry, pagedMessage);
}
}
} else {
@@ -341,14 +340,14 @@ public class AckManager implements ActiveMQComponent {
Queue queue = queueRetries.getContext();
for (AckRetry retry : queueRetries.valuesCopy()) {
if (ack(retry.getNodeID(), queue, retry.getMessageID(),
retry.getReason(), false)) {
- logger.debug("Removing retry {} as the retry went ok", retry);
+ logger.trace("Removing retry {} as the retry went ok", retry);
queueRetries.remove(retry);
} else {
int retried = retry.attemptedQueue();
- if (logger.isDebugEnabled()) {
- logger.debug("retry {} attempted {} times on the queue",
retry, retried);
+ if (logger.isTraceEnabled()) {
+ logger.trace("retry {} attempted {} times on the queue",
retry, retried);
}
- if (retried >= MIN_QUEUE_ATTEMPTS) {
+ if (retried >=
configuration.getMirrorAckManagerQueueAttempts()) {
needScanOnPaging = true;
}
}
@@ -365,6 +364,8 @@ public class AckManager implements ActiveMQComponent {
AckRetry retry = new AckRetry(nodeID, messageID, reason);
journalHashMapProvider.getMap(queue.getID(), queue).put(retry, retry);
if (scheduledComponent != null) {
+ // we set the retry delay again in case it was changed.
+
scheduledComponent.setPeriod(configuration.getMirrorAckManagerRetryDelay());
scheduledComponent.delay();
}
}
@@ -377,20 +378,29 @@ public class AckManager implements ActiveMQComponent {
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID,
messageID, referenceIDSupplier);
if (reference == null) {
- logger.debug("Could not find retry on reference nodeID={} (while
localID={}), messageID={} on queue {}, server={}", nodeID,
referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(),
server);
+ if (logger.isDebugEnabled()) {
+ logger.debug("ACK Manager could not find reference nodeID={}
(while localID={}), messageID={} on queue {}, server={}. Adding retry with
minQueue={}, maxPage={}, delay={}", nodeID,
referenceIDSupplier.getDefaultNodeID(), messageID, targetQueue.getName(),
server, configuration.getMirrorAckManagerQueueAttempts(),
configuration.getMirrorAckManagerPageAttempts(),
configuration.getMirrorAckManagerRetryDelay());
+ printQueueDebug(targetQueue);
+ }
if (allowRetry) {
addRetry(nodeID, targetQueue, messageID, reason);
}
return false;
} else {
if (logger.isTraceEnabled()) {
- logger.trace("ack {} worked well for messageID={} nodeID={}
queue={}, targetQueue={}", server, messageID, nodeID, reference.getQueue(),
targetQueue);
+ logger.trace("ack worked well for messageID={} nodeID={} queue={},
reference={}", messageID, nodeID, reference.getQueue().getName(), reference);
+ if (reference.isPaged()) {
+ logger.trace("position for messageID={} = {}", messageID,
((PagedReference)reference).getPosition());
+ }
}
doACK(targetQueue, reference, reason);
return true;
}
}
+ private void printQueueDebug(Queue targetQueue) {
+ logger.debug("... queue {}/{} had {} consumers, {} messages, {}
scheduled messages, {} delivering messages, paging={}", targetQueue.getID(),
targetQueue.getName(), targetQueue.getConsumerCount(),
targetQueue.getMessageCount(), targetQueue.getScheduledCount(),
targetQueue.getDeliveringCount(), targetQueue.getPagingStore().isPaging());
+ }
private void doACK(Queue targetQueue, MessageReference reference, AckReason
reason) {
try {
@@ -399,9 +409,12 @@ public class AckManager implements ActiveMQComponent {
targetQueue.expire(reference, null, false);
break;
default:
- TransactionImpl transaction = new
TransactionImpl(server.getStorageManager()).setAsync(true);
+ TransactionImpl transaction = new
TransactionImpl(server.getStorageManager());
targetQueue.acknowledge(transaction, reference, reason, null,
false);
transaction.commit();
+ if (logger.isTraceEnabled()) {
+ logger.trace("Transaction {} committed on acking reference
{}", transaction.getID(), reference);
+ }
break;
}
} catch (Exception e) {
@@ -428,11 +441,6 @@ public class AckManager implements ActiveMQComponent {
public void nextStep() {
try {
if (!retryIterator.hasNext()) {
- if (retriedPaging) {
- logger.debug("Retried acks on paging, better to rebuild the
page counters");
- server.getPagingManager().rebuildCounters(null);
- }
-
logger.trace("Iterator is done on retry, server={}", server);
AckManager.this.endRetry();
} else {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java
index 15f6075358..e23f0d69bc 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AckManagerProvider.java
@@ -47,13 +47,10 @@ public class AckManagerProvider {
}
}
- public static AckManager getManager(ActiveMQServer server, boolean start) {
+ public static AckManager getManager(ActiveMQServer server) {
synchronized (managerHashMap) {
AckManager ackManager = managerHashMap.get(server);
if (ackManager != null) {
- if (start && !ackManager.isStarted()) {
- ackManager.start();
- }
return ackManager;
}
@@ -64,9 +61,6 @@ public class AckManagerProvider {
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
- if (start) {
- ackManager.start();
- }
return ackManager;
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java
index 79508f3219..c1735e6d04 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/MirrorTransaction.java
@@ -30,6 +30,8 @@ public class MirrorTransaction extends TransactionImpl {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ boolean allowPageTransaction;
+
MirrorController controlInUse;
public MirrorTransaction(StorageManager storageManager) {
@@ -49,4 +51,13 @@ public class MirrorTransaction extends TransactionImpl {
}
}
+ @Override
+ public boolean isAllowPageTransaction() {
+ return allowPageTransaction;
+ }
+
+ public MirrorTransaction setAllowPageTransaction(boolean
allowPageTransaction) {
+ this.allowPageTransaction = allowPageTransaction;
+ return this;
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index edb56c3bd7..1df52bd0bc 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1509,4 +1509,30 @@ public interface Configuration {
void setManagementRbacPrefix(String prefix);
+ /** This configures the Mirror Ack Manager number of attempts on queues
before trying page acks.
+ * The default value here is 5. */
+ int getMirrorAckManagerQueueAttempts();
+
+ Configuration setMirrorAckManagerQueueAttempts(int queueAttempts);
+
+ /** This configures the Mirror Ack Manager number of attempts on page acks.
+ * The default value here is 2. */
+ int getMirrorAckManagerPageAttempts();
+
+ Configuration setMirrorAckManagerPageAttempts(int pageAttempts);
+
+ /** This configures the interval in which the Mirror AckManager will retry
acks when
+ * It is not intended to be configured through the XML.
+ * The default value here is 100, and this is in milliseconds. */
+ int getMirrorAckManagerRetryDelay();
+
+ Configuration setMirrorAckManagerRetryDelay(int delay);
+
+ /** Should Mirror use Page Transactions When target destinations is paging?
+ * When a target queue on the mirror is paged, the mirror will not record
a page transaction for every message.
+ * The default is false, and the overhead of paged messages will be
smaller, but there is a possibility of eventual duplicates in case of
interrupted communication between the mirror source and target.
+ * If you set this to true there will be a record stored on the journal
for the page-transaction additionally to the record in the page store. */
+ boolean isMirrorPageTransaction();
+
+ Configuration setMirrorPageTransaction(boolean ignorePageTransactions);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 785cecce78..b99e7a53ac 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -438,6 +438,15 @@ public class ConfigurationImpl implements Configuration,
Serializable {
private boolean managementMessagesRbac =
ActiveMQDefaultConfiguration.getManagementMessagesRbac();
+ private int mirrorAckManagerMinQueueAttempts =
ActiveMQDefaultConfiguration.getMirrorAckManagerMinQueueAttempts();
+
+ private int mirrorAckManagerMaxPageAttempts =
ActiveMQDefaultConfiguration.getMirrorAckManagerMaxPageAttempts();
+
+ private int mirrorAckManagerRetryDelay =
ActiveMQDefaultConfiguration.getMirrorAckManagerRetryDelay();
+
+ private boolean mirrorPageTransaction =
ActiveMQDefaultConfiguration.getDefaultMirrorPageTransaction();
+
+
/**
* Parent folder for all data folders.
*/
@@ -3353,6 +3362,55 @@ public class ConfigurationImpl implements Configuration,
Serializable {
this.managementRbacPrefix = prefix;
}
+
+ @Override
+ public int getMirrorAckManagerQueueAttempts() {
+ return mirrorAckManagerMinQueueAttempts;
+ }
+
+ @Override
+ public ConfigurationImpl setMirrorAckManagerQueueAttempts(int
minQueueAttempts) {
+ logger.debug("Setting mirrorAckManagerMinQueueAttempts = {}",
minQueueAttempts);
+ this.mirrorAckManagerMinQueueAttempts = minQueueAttempts;
+ return this;
+ }
+
+ @Override
+ public int getMirrorAckManagerPageAttempts() {
+ return this.mirrorAckManagerMaxPageAttempts;
+ }
+
+ @Override
+ public ConfigurationImpl setMirrorAckManagerPageAttempts(int
maxPageAttempts) {
+ logger.debug("Setting mirrorAckManagerMaxPageAttempts = {}",
maxPageAttempts);
+ this.mirrorAckManagerMaxPageAttempts = maxPageAttempts;
+ return this;
+ }
+
+ @Override
+ public int getMirrorAckManagerRetryDelay() {
+ return mirrorAckManagerRetryDelay;
+ }
+
+ @Override
+ public ConfigurationImpl setMirrorAckManagerRetryDelay(int delay) {
+ logger.debug("Setting mirrorAckManagerRetryDelay = {}", delay);
+ this.mirrorAckManagerRetryDelay = delay;
+ return this;
+ }
+
+ @Override
+ public boolean isMirrorPageTransaction() {
+ return mirrorPageTransaction;
+ }
+
+ @Override
+ public Configuration setMirrorPageTransaction(boolean
ignorePageTransactions) {
+ logger.debug("Setting mirrorIgnorePageTransactions={}",
ignorePageTransactions);
+ this.mirrorPageTransaction = ignorePageTransactions;
+ return this;
+ }
+
// extend property utils with ability to auto-fill and locate from
collections
// collection entries are identified by the name() property
private static class CollectionAutoFillPropertiesUtil extends
PropertyUtilsBean {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 6b885e75d1..99012fd540 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -380,6 +380,14 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
private static final String ID_CACHE_SIZE = "id-cache-size";
+ private static final String MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS =
"mirror-ack-manager-queue-attempts";
+
+ private static final String MIRROR_ACK_MANAGER_PAGE_ATTEMPTS =
"mirror-ack-manager-page-attempts";
+
+ private static final String MIRROR_ACK_MANAGER_RETRY_DELAY =
"mirror-ack-manager-retry-delay";
+
+ private static final String MIRROR_PAGE_TRANSACTION =
"mirror-page-transaction";
+
private boolean validateAIO = false;
private boolean printPageMaxSizeUsed = false;
@@ -849,6 +857,14 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
config.setManagementRbacPrefix(getString(e, "management-rbac-prefix",
config.getManagementRbacPrefix(), NO_CHECK));
+ config.setMirrorPageTransaction(getBoolean(e, MIRROR_PAGE_TRANSACTION,
config.isMirrorPageTransaction()));
+
+ config.setMirrorAckManagerPageAttempts(getInteger(e,
MIRROR_ACK_MANAGER_PAGE_ATTEMPTS, config.getMirrorAckManagerPageAttempts(),
GT_ZERO));
+
+ config.setMirrorAckManagerQueueAttempts(getInteger(e,
MIRROR_ACK_MANAGER_QUEUE_ATTEMPTS, config.getMirrorAckManagerQueueAttempts(),
GT_ZERO));
+
+ config.setMirrorAckManagerRetryDelay(getInteger(e,
MIRROR_ACK_MANAGER_RETRY_DELAY, config.getMirrorAckManagerRetryDelay(),
GT_ZERO));
+
parseAddressSettings(e, config);
parseResourceLimits(e, config);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index dcf0e01412..3b60ef2f06 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -1253,8 +1253,7 @@ public class PagingStoreImpl implements PagingStore {
return false;
}
- // not using page transaction if transaction is declared async
- final long transactionID = (tx == null || tx.isAsync()) ? -1 :
tx.getID();
+ final long transactionID = (tx != null &&
tx.isAllowPageTransaction()) ? tx.getID() : -1L;
if (pageDecorator != null) {
message = pageDecorator.apply(message);
@@ -1273,7 +1272,7 @@ public class PagingStoreImpl implements PagingStore {
currentPageSize += bytesToWrite;
}
- if (tx != null && !tx.isAsync()) {
+ if (tx != null && tx.isAllowPageTransaction()) {
installPageTransaction(tx, listCtx);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java
index 11bb71b6b1..2069769a95 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/AckRetry.java
@@ -30,8 +30,8 @@ public final class AckRetry {
byte[] temporaryNodeBytes;
long messageID;
AckReason reason;
- short pageAttempts;
- short queueAttempts;
+ int pageAttempts;
+ int queueAttempts;
private static Persister persister = new Persister();
@@ -41,7 +41,7 @@ public final class AckRetry {
@Override
public String toString() {
- return "ACKRetry{" + "nodeID='" + nodeID + '\'' + ", messageID=" +
messageID + ", reason=" + reason + '}';
+ return "AckRetry{" + "nodeID='" + nodeID + '\'' + ", messageID=" +
messageID + ", reason=" + reason + ", pageAttempts=" + pageAttempts + ",
queueAttempts=" + queueAttempts + '}';
}
public AckRetry() {
@@ -92,19 +92,19 @@ public final class AckRetry {
return this;
}
- public short getPageAttempts() {
+ public int getPageAttempts() {
return pageAttempts;
}
- public short getQueueAttempts() {
+ public int getQueueAttempts() {
return queueAttempts;
}
- public short attemptedPage() {
+ public int attemptedPage() {
return ++pageAttempts;
}
- public short attemptedQueue() {
+ public int attemptedQueue() {
return ++queueAttempts;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
index befaaf3405..0a64aa1e90 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/Transaction.java
@@ -109,4 +109,8 @@ public interface Transaction {
/** To be used on control transactions that are meant as internal and don't
really require a hard sync. */
Transaction setAsync(boolean async);
+
+ default boolean isAllowPageTransaction() {
+ return true;
+ }
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index d3779266bd..b0c8116257 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -915,6 +915,48 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="mirror-ack-manager-queue-attempts" type="xsd:int"
maxOccurs="1" minOccurs="0" default="5">
+ <xsd:annotation>
+ <xsd:documentation>
+ The number of times a mirror target would retry an
acknowledgement on the queue before scanning page files for the message.
+
+ This is exposed as mirrorAckManagerQueueAttempts on broker
properties.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="mirror-ack-manager-page-attempts" type="xsd:int"
maxOccurs="1" minOccurs="0" default="2">
+ <xsd:annotation>
+ <xsd:documentation>
+ The number of times a mirror target would retry an
acknowledgement on paging.
+
+ This is exposed as mirrorAckManagerPageAttempts on broker
properties.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="mirror-ack-manager-retry-delay" type="xsd:int"
maxOccurs="1" minOccurs="0" default="100">
+ <xsd:annotation>
+ <xsd:documentation>
+ Period in milliseconds for which retries are going to be
exercised.
+ This is exposed as mirrorAckManagerRetryDelay on broker
properties.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="mirror-page-transaction" type="xsd:boolean"
maxOccurs="1" minOccurs="0" default="false">
+ <xsd:annotation>
+ <xsd:documentation>
+ Should Mirror use Page Transactions When target destinations
is paging?
+ When a target queue on the mirror is paged, the mirror will
not record a page transaction for every message.
+ The default is false, and the overhead of paged messages
will be smaller, but there is a possibility of eventual duplicates in case of
interrupted communication between the mirror source and target.
+ If you set this to true there will be a record stored on the
journal for the page-transaction additionally to the record in the page store.
+
+ This is exposed as mirrorPageTransactions on broker
properties.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="suppress-session-notifications" type="xsd:boolean"
default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 2b1da86de2..84a3273ad9 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -507,6 +507,11 @@ public class FileConfigurationTest extends
ConfigurationImplTest {
assertFalse(conf.getAddressSettings().get("a2").isEnableIngressTimestamp());
assertEquals(Integer.valueOf(500),
conf.getAddressSettings().get("a2").getIDCacheSize());
+ Assert.assertEquals(111, conf.getMirrorAckManagerQueueAttempts());
+ Assert.assertEquals(222, conf.getMirrorAckManagerPageAttempts());
+ Assert.assertEquals(333, conf.getMirrorAckManagerRetryDelay());
+ Assert.assertTrue(conf.isMirrorPageTransaction());
+
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
assertEquals(104,
conf.getResourceLimitSettings().get("myUser").getMaxConnections());
assertEquals(13,
conf.getResourceLimitSettings().get("myUser").getMaxQueues());
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 137ddfa4c6..01adc7fe8f 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -541,6 +541,12 @@
<network-check-ping-command>ping-four</network-check-ping-command>
<network-check-ping6-command>ping-six</network-check-ping6-command>
<page-sync-timeout>1000</page-sync-timeout>
+
+
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
+ <mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
+ <mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
+ <mirror-page-transaction>true</mirror-page-transaction>
+
<security-settings>
<security-setting match="a1">
<permission type="createNonDurableQueue" roles="a1.1"/>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
index 6193b1086c..f22ba09f28 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config.xml
@@ -70,6 +70,12 @@
<critical-analyzer-timeout>777</critical-analyzer-timeout>
<critical-analyzer>false</critical-analyzer>
<literal-match-markers>()</literal-match-markers>
+
+
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
+ <mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
+ <mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
+ <mirror-page-transaction>true</mirror-page-transaction>
+
<remoting-incoming-interceptors>
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor1</class-name>
<class-name>org.apache.activemq.artemis.tests.unit.core.config.impl.TestInterceptor2</class-name>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
index 1adab32879..1c163b4a14 100644
---
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
+++
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config.xml
@@ -70,6 +70,13 @@
<critical-analyzer-timeout>777</critical-analyzer-timeout>
<critical-analyzer>false</critical-analyzer>
<literal-match-markers>()</literal-match-markers>
+
+
<mirror-ack-manager-queue-attempts>111</mirror-ack-manager-queue-attempts>
+ <mirror-ack-manager-page-attempts>222</mirror-ack-manager-page-attempts>
+ <mirror-ack-manager-retry-delay>333</mirror-ack-manager-retry-delay>
+ <mirror-page-transaction>true</mirror-page-transaction>
+
+
<xi:include
href="${xincludePath}/ConfigurationTest-xinclude-schema-config-remoting-incoming-interceptors.xml"/>
<xi:include
href="${xincludePath}/ConfigurationTest-xinclude-schema-config-remoting-outgoing-interceptors.xml"/>
<persist-delivery-count-before-delivery>true</persist-delivery-count-before-delivery>
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
index 8792f2e7f9..077764b4e4 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java
@@ -136,7 +136,7 @@ public class AckManagerTest extends ActiveMQTestBase {
ReferenceIDSupplier referenceIDSupplier = new
ReferenceIDSupplier(server1);
{
- AckManager ackManager = AckManagerProvider.getManager(server1, false);
+ AckManager ackManager = AckManagerProvider.getManager(server1);
AtomicInteger counter = new AtomicInteger(0);
@@ -161,7 +161,8 @@ public class AckManagerTest extends ActiveMQTestBase {
// in this following loop we will get the ackManager, compare the stored
retries. stop the server and validate if they were reloaded correctly
for (int repeat = 0; repeat < 2; repeat++) {
logger.info("Repeating {}", repeat);
- AckManager ackManager = AckManagerProvider.getManager(server1, true);
+ AckManager ackManager = AckManagerProvider.getManager(server1);
+ ackManager.start();
HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry,
AckRetry, Queue>>> sortedRetries = ackManager.sortRetries();
@@ -179,19 +180,22 @@ public class AckManagerTest extends ActiveMQTestBase {
Wait.assertEquals(numberOfMessages, c1s1::getMessageCount);
Wait.assertEquals(numberOfMessages, c2s2::getMessageCount);
- AckManager originalManager = AckManagerProvider.getManager(server1,
false);
+ AckManager originalManager = AckManagerProvider.getManager(server1);
server1.stop();
Assert.assertEquals(0, AckManagerProvider.getSize());
server1.start();
- AckManager newManager = AckManagerProvider.getManager(server1, false);
+ AckManager newManager = AckManagerProvider.getManager(server1);
Assert.assertEquals(1, AckManagerProvider.getSize());
- Assert.assertNotSame(originalManager,
AckManagerProvider.getManager(server1, true));
+ Assert.assertNotSame(originalManager,
AckManagerProvider.getManager(server1));
+ AckManager manager = AckManagerProvider.getManager(server1);
+ Wait.assertTrue(manager::isStarted, 5_000);
Assert.assertEquals(1, AckManagerProvider.getSize());
Assert.assertNotSame(newManager, ackManager);
}
- AckManager ackManager = AckManagerProvider.getManager(server1, true);
+ AckManager ackManager = AckManagerProvider.getManager(server1);
+ ackManager.start();
HashMap<SimpleString, LongObjectHashMap<JournalHashMap<AckRetry,
AckRetry, Queue>>> sortedRetries = ackManager.sortRetries();
Assert.assertEquals(1, sortedRetries.size());
LongObjectHashMap<JournalHashMap<AckRetry, AckRetry, Queue>>
acksOnAddress = sortedRetries.get(c1s1.getAddress());
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
new file mode 100644
index 0000000000..c93d419ff9
--- /dev/null
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/SingleMirrorSoakTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.io.File;
+import java.io.StringWriter;
+import java.lang.invoke.MethodHandles;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.tests.soak.SoakTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.TestParameters;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
+import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SingleMirrorSoakTest extends SoakTestBase {
+
+ private static final String TEST_NAME = "SINGLE_MIRROR_SOAK";
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ // Set this to true and log4j will be configured with some relevant
log.trace for the AckManager at the server's
+ private static final boolean TRACE_LOGS =
Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS",
"false"));
+ private static final int NUMBER_MESSAGES =
TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_500);
+ private static final int RECEIVE_COMMIT =
TestParameters.testProperty(TEST_NAME, "RECEIVE_COMMIT", 100);
+ private static final int SEND_COMMIT =
TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
+ private static final int KILL_INTERNAL =
TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
+ private static final int SNF_TIMEOUT =
TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
+
+ private static final String TOPIC_NAME = "topicTest";
+
+ private static String body;
+
+ static {
+ StringWriter writer = new StringWriter();
+ while (writer.getBuffer().length() < 30 * 1024) {
+ writer.append("The sky is blue, ..... watch out for poop from the
birds though!...");
+ }
+ body = writer.toString();
+ }
+
+ public static final String DC1_NODE = "SingleMirrorSoakTest/DC1";
+ public static final String DC2_NODE = "SingleMirrorSoakTest/DC2";
+
+ volatile Process processDC1;
+ volatile Process processDC2;
+
+ @After
+ public void destroyServers() {
+ if (processDC1 != null) {
+ processDC1.destroyForcibly();
+ }
+ if (processDC2 != null) {
+ processDC2.destroyForcibly();
+ }
+
+ }
+
+ private static final String DC1_URI = "tcp://localhost:61616";
+ private static final String DC2_URI = "tcp://localhost:61618";
+
+ private static void createServer(String serverName,
+ String connectionName,
+ String mirrorURI,
+ int porOffset,
+ boolean paging) throws Exception {
+ File serverLocation = getFileServerLocation(serverName);
+ deleteDirectory(serverLocation);
+
+ HelperCreate cliCreateServer = new HelperCreate();
+
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
+ cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
+ cliCreateServer.setClustered(false);
+ cliCreateServer.setNoWeb(false);
+ cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor",
"--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name",
DC1_NODE);
+ cliCreateServer.addArgs("--addresses", TOPIC_NAME);
+ cliCreateServer.setPortOffset(porOffset);
+ cliCreateServer.createServer();
+
+ Properties brokerProperties = new Properties();
+ brokerProperties.put("AMQPConnections." + connectionName + ".uri",
mirrorURI);
+ brokerProperties.put("AMQPConnections." + connectionName +
".retryInterval", "1000");
+ brokerProperties.put("AMQPConnections." + connectionName + ".type",
AMQPBrokerConnectionAddressType.MIRROR.toString());
+ brokerProperties.put("AMQPConnections." + connectionName +
".connectionElements.mirror.sync", "false");
+ brokerProperties.put("largeMessageSync", "false");
+ brokerProperties.put("mirrorAckManagerPageAttempts", "10");
+ brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
+ // if we don't use pageTransactions we may eventually get a few
duplicates
+ brokerProperties.put("mirrorPageTransaction", "true");
+ File brokerPropertiesFile = new File(serverLocation,
"broker.properties");
+ saveProperties(brokerProperties, brokerPropertiesFile);
+
+ File brokerXml = new File(serverLocation, "/etc/broker.xml");
+ Assert.assertTrue(brokerXml.exists());
+ // Adding redistribution delay to broker configuration
+ Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting
match=\"#\">", "<address-setting match=\"#\">\n\n" + "
<redistribution-delay>0</redistribution-delay> <!-- added by
SimpleMirrorSoakTest.java --> \n"));
+ if (paging) {
+ Assert.assertTrue(FileUtil.findReplace(brokerXml,
"<max-size-messages>-1</max-size-messages>",
"<max-size-messages>1</max-size-messages>"));
+ Assert.assertTrue(FileUtil.findReplace(brokerXml,
"<max-read-page-bytes>20M</max-read-page-bytes>",
"<max-read-page-bytes>-1</max-read-page-bytes>"));
+ Assert.assertTrue(FileUtil.findReplace(brokerXml,
"<max-read-page-messages>-1</max-read-page-messages>",
"<max-read-page-messages>100000</max-read-page-messages>\n" + "
<prefetch-page-messages>10000</prefetch-page-messages>"));
+ }
+
+ if (TRACE_LOGS) {
+ File log4j = new File(serverLocation, "/etc/log4j2.properties");
+ Assert.assertTrue(FileUtil.findReplace(log4j,
"logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" +
+ "\n" +
"logger.ack.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n"
+ + "logger.ack.level=TRACE\n"
+ +
"logger.config.name=org.apache.activemq.artemis.core.config.impl.ConfigurationImpl\n"
+ + "logger.config.level=TRACE\n"
+ + "appender.console.filter.threshold.type = ThresholdFilter\n"
+ + "appender.console.filter.threshold.level = info"));
+ }
+
+ }
+
+ public static void createRealServers(boolean paging) throws Exception {
+ createServer(DC1_NODE, "mirror", DC2_URI, 0, paging);
+ createServer(DC2_NODE, "mirror", DC1_URI, 2, paging);
+ }
+
+ private void startServers() throws Exception {
+ processDC1 = startServer(DC1_NODE, -1, -1, new
File(getServerLocation(DC1_NODE), "broker.properties"));
+ processDC2 = startServer(DC2_NODE, -1, -1, new
File(getServerLocation(DC2_NODE), "broker.properties"));
+
+ ServerUtil.waitForServerToStart(0, 10_000);
+ ServerUtil.waitForServerToStart(2, 10_000);
+ }
+
+ @Test
+ public void testInterruptedMirrorTransfer() throws Exception {
+ createRealServers(true);
+ startServers();
+
+
+ Assert.assertTrue(KILL_INTERNAL > SEND_COMMIT);
+
+ String clientIDA = "nodeA";
+ String clientIDB = "nodeB";
+ String subscriptionID = "my-order";
+ String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
+
+ ConnectionFactory connectionFactoryDC1A =
CFUtil.createConnectionFactory("amqp", DC1_URI);
+
+ consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false,
false, RECEIVE_COMMIT);
+ consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, 0, false,
false, RECEIVE_COMMIT);
+
+ SimpleManagement managementDC1 = new SimpleManagement(DC1_URI, null,
null);
+ SimpleManagement managementDC2 = new SimpleManagement(DC2_URI, null,
null);
+
+ runAfter(() -> managementDC1.close());
+ runAfter(() -> managementDC2.close());
+
+ Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." +
subscriptionID));
+ Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." +
subscriptionID));
+ Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." +
subscriptionID));
+ Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." +
subscriptionID));
+
+ ExecutorService executorService = Executors.newFixedThreadPool(3);
+ runAfter(executorService::shutdownNow);
+ executorService.execute(() -> {
+ try {
+ consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0,
NUMBER_MESSAGES, true, false, RECEIVE_COMMIT);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ });
+ executorService.execute(() -> {
+ try {
+ consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0,
NUMBER_MESSAGES, true, false, RECEIVE_COMMIT);
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ });
+
+ OrderedExecutor restartExeuctor = new OrderedExecutor(executorService);
+ AtomicBoolean running = new AtomicBoolean(true);
+ runAfter(() -> running.set(false));
+
+ try (Connection connection = connectionFactoryDC1A.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createTopic(TOPIC_NAME));
+ for (int i = 0; i < NUMBER_MESSAGES; i++) {
+ TextMessage message = session.createTextMessage(body);
+ message.setIntProperty("i", i);
+ message.setBooleanProperty("large", false);
+ producer.send(message);
+ if (i > 0 && i % SEND_COMMIT == 0) {
+ logger.info("Sent {} messages", i);
+ session.commit();
+ }
+ if (i > 0 && i % KILL_INTERNAL == 0) {
+ restartExeuctor.execute(() -> {
+ if (running.get()) {
+ try {
+ logger.info("Restarting target server (DC2)");
+ if (processDC2 != null) {
+ processDC2.destroyForcibly();
+ processDC2.waitFor(1, TimeUnit.MINUTES);
+ processDC2 = null;
+ }
+ processDC2 = startServer(DC2_NODE, 2, 10_000, new
File(getServerLocation(DC2_NODE), "broker.properties"));
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ });
+ }
+ }
+ session.commit();
+ running.set(false);
+ }
+
+ Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue),
SNF_TIMEOUT);
+ Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue),
SNF_TIMEOUT);
+ Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." +
subscriptionID), 10_000);
+ Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." +
subscriptionID), 10_000);
+ Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." +
subscriptionID), 10_000);
+ Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." +
subscriptionID), 10_000);
+ }
+
+ private static void consume(ConnectionFactory factory,
+ String clientID,
+ String subscriptionID,
+ int start,
+ int numberOfMessages,
+ boolean expectEmpty,
+ boolean assertBody,
+ int batchCommit) throws Exception {
+ try (Connection connection = factory.createConnection()) {
+ connection.setClientID(clientID);
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(TOPIC_NAME);
+ connection.start();
+ MessageConsumer consumer = session.createDurableConsumer(topic,
subscriptionID);
+ boolean failed = false;
+
+ int pendingCommit = 0;
+
+ for (int i = start; i < start + numberOfMessages; i++) {
+ TextMessage message = (TextMessage) consumer.receive(10_000);
+ Assert.assertNotNull(message);
+ logger.debug("Received message {}, large={}",
message.getIntProperty("i"), message.getBooleanProperty("large"));
+ if (message.getIntProperty("i") != i) {
+ failed = true;
+ logger.warn("Expected message {} but got {}", i,
message.getIntProperty("i"));
+ }
+ logger.debug("Consumed {}, large={}", i,
message.getBooleanProperty("large"));
+ pendingCommit++;
+ if (pendingCommit >= batchCommit) {
+ logger.info("received {}", i);
+ session.commit();
+ pendingCommit = 0;
+ }
+ }
+ session.commit();
+
+ Assert.assertFalse(failed);
+
+ if (expectEmpty) {
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+ }
+ }
+
+ public long getCount(SimpleManagement simpleManagement, String queue)
throws Exception {
+ try {
+ long value = simpleManagement.getMessageCountOnQueue(queue);
+ logger.debug("count on queue {} is {}", queue, value);
+ return value;
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ return -1;
+ }
+ }
+}
diff --git a/tests/soak-tests/src/test/scripts/parameters.sh
b/tests/soak-tests/src/test/scripts/parameters.sh
index 62908b885b..ddbe3aa2e6 100755
--- a/tests/soak-tests/src/test/scripts/parameters.sh
+++ b/tests/soak-tests/src/test/scripts/parameters.sh
@@ -141,4 +141,11 @@ export TEST_CLIENT_FAILURE_OPENWIRE_MEMORY_CLIENT=-Xmx256m
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_TEST_ENABLED=true
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_SERVERS=3
export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_QUEUES=200
-export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10
\ No newline at end of file
+export TEST_CLUSTER_NOTIFICATIONS_CONTINUITY_NUMBER_OF_WORKERS=10
+
+export TEST_SINGLE_MIRROR_SOAK_TRACE_LOGS=false
+export TEST_SINGLE_MIRROR_SOAK_NUMBER_MESSAGES=2500
+export TEST_SINGLE_MIRROR_SOAK_RECEIVE_COMMIT=100
+export TEST_SINGLE_MIRROR_SOAK_SEND_COMMIT=100
+export TEST_SINGLE_MIRROR_SOAK_KILL_INTERVAL=500
+export TEST_SINGLE_MIRROR_SOAK_SNF_TIMEOUT=60000
\ No newline at end of file