This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 7fb4f80 ARTEMIS-3464 Missing ACKs on Page and Mirror
new 9de2888 This closes #3728
7fb4f80 is described below
commit 7fb4f806495096439a2a8a9f33a5d01a535b82ab
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Aug 30 17:09:17 2021 -0400
ARTEMIS-3464 Missing ACKs on Page and Mirror
---
.../amqp/connect/AMQPBrokerConnection.java | 4 +
.../connect/mirror/AMQPMirrorControllerTarget.java | 73 ++++++-
.../amqp/connect/mirror/ReferenceNodeStore.java | 5 +
.../core/paging/cursor/PageSubscription.java | 14 +-
.../paging/cursor/impl/PageSubscriptionImpl.java | 133 +++++++++++++
.../transaction/TransactionOperationAbstract.java | 9 +
.../artemis/tests/util/ActiveMQTestBase.java | 9 +-
.../integration/amqp/connect/AMQPReplicaTest.java | 27 ++-
.../integration/amqp/connect/PagedMirrorTest.java | 161 +++++++++++++++
.../paging/IndividualAckPagingTest.java | 172 +++++++++++++++++
.../tests/integration/paging/PageAckScanTest.java | 171 ++++++++++++++++
tests/smoke-tests/pom.xml | 32 +++
.../servers/brokerConnect/pagedA/broker.xml | 215 +++++++++++++++++++++
.../brokerConnect/pagedA/logging.properties | 86 +++++++++
.../servers/brokerConnect/pagedB/broker.xml | 215 +++++++++++++++++++++
.../brokerConnect/pagedB/logging.properties | 86 +++++++++
.../brokerConnection/PagedMirrorSmokeTest.java | 143 ++++++++++++++
17 files changed, 1535 insertions(+), 20 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 3d38eac..0cfcd4b 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -393,6 +393,10 @@ public class AMQPBrokerConnection implements
ClientConnectionLifeCycleListener,
mirrorControlQueue = server.createQueue(new
QueueConfiguration(getMirrorSNF(replicaConfig)).setAddress(getMirrorSNF(replicaConfig)).setRoutingType(RoutingType.ANYCAST).setDurable(replicaConfig.isDurable()).setInternal(true),
true);
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("Mirror queue " + mirrorControlQueue.getName());
+ }
+
mirrorControlQueue.setMirrorController(true);
QueueBinding snfReplicaQueueBinding =
(QueueBinding)server.getPostOffice().getBinding(getMirrorSNF(replicaConfig));
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 978c166..7f59ef7 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
import
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -45,7 +46,6 @@ import
org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
import org.apache.activemq.artemis.utils.ByteUtil;
-import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.pools.MpscPool;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@@ -95,7 +95,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
public TransactionOperationAbstract tx = new
TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
- completeOperation();
+ connectionRun();
}
};
@@ -121,10 +121,10 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
@Override
public void done() {
- completeOperation();
+ connectionRun();
}
- private void completeOperation() {
+ public void connectionRun() {
connection.runNow(ACKMessageOperation.this);
}
@@ -146,7 +146,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
DuplicateIDCache lruduplicateIDCache;
String lruDuplicateIDKey;
- private final NodeStore<MessageReference> referenceNodeStore;
+ private final ReferenceNodeStore referenceNodeStore;
public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
@@ -354,6 +354,20 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
}
+ private void performAckOnPage(String nodeID, long messageID, Queue
targetQueue, ACKMessageOperation ackMessageOperation) {
+ if (targetQueue.getPagingStore().isPaging()) {
+ PageAck pageAck = new PageAck(nodeID, messageID, ackMessageOperation);
+ targetQueue.getPageSubscription().addScanAck(pageAck, pageAck,
pageAck);
+ targetQueue.getPageSubscription().performScanAck();
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Post ack Server " + server + " could not find
messageID = " + messageID +
+ " representing nodeID=" + nodeID);
+ }
+
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
+ }
+ }
+
private void performAck(String nodeID, long messageID, Queue targetQueue,
ACKMessageOperation ackMessageOperation, boolean retry) {
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID=" + nodeID + ", messageID=" +
messageID + ")" + ", targetQueue=" + targetQueue.getName());
@@ -380,10 +394,7 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
logger.warn(e.getMessage(), e);
}
} else {
- if (logger.isDebugEnabled()) {
- logger.debug("Post ack Server " + server + " could not find
messageID = " + messageID +
- " representing nodeID=" + nodeID);
- }
+ performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
}
}
@@ -476,4 +487,48 @@ public class AMQPMirrorControllerTarget extends
ProtonAbstractReceiver implement
public void sendMessage(Message message, RoutingContext context,
List<MessageReference> refs) {
}
+ // I need a supress warning annotation here
+ // because errorProne is issuing an error her, however I really intend to
compare PageACK against PagedReference
+ @SuppressWarnings("ComparableType")
+ class PageAck implements Comparable<PagedReference>, Runnable {
+
+ final String nodeID;
+ final long messageID;
+ final ACKMessageOperation operation;
+
+ PageAck(String nodeID, long messageID, ACKMessageOperation operation) {
+ this.nodeID = nodeID;
+ this.messageID = messageID;
+ this.operation = operation;
+ }
+
+ @Override
+ public int compareTo(PagedReference reference) {
+ String refNodeID = referenceNodeStore.getServerID(reference);
+ long refMessageID = referenceNodeStore.getID(reference);
+ if (refNodeID == null) {
+ refNodeID = referenceNodeStore.getDefaultNodeID();
+ }
+
+ if (refNodeID.equals(nodeID)) {
+ long diff = refMessageID - messageID;
+ if (diff == 0) {
+ return 0;
+ } else if (diff > 0) {
+ return 1;
+ } else {
+ return -1;
+ }
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public void run() {
+ operation.connectionRun();
+ }
+
+ }
+
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
index 1633076..61b0f14 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
@@ -42,6 +42,11 @@ public class ReferenceNodeStore implements
NodeStore<MessageReference> {
String lruListID;
LongObjectHashMap<LinkedListImpl.Node<MessageReference>> lruMap;
+
+ public String getDefaultNodeID() {
+ return serverID;
+ }
+
@Override
public void storeNode(MessageReference element,
LinkedListImpl.Node<MessageReference> node) {
String list = getServerID(element);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index eb41e63..4d7bd7b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -85,9 +85,17 @@ public interface PageSubscription {
// for internal (cursor) classes
void confirmPosition(Transaction tx, PagePosition position) throws
Exception;
- /**
- * @return the first page in use or MAX_LONG if none is in use
- */
+
+ // Add a scan function to be performed. It will be completed when you call
performScan
+ void addScanAck(Comparable<PagedReference> scanFunction, Runnable found,
Runnable notfound);
+
+ // it will schedule a scan on pages for everything that was added through
addScanAck
+ void performScanAck();
+
+
+ /**
+ * @return the first page in use or MAX_LONG if none is in use
+ */
long getFirstPage();
// Reload operations
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 1ca83ce..4487786 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -112,6 +112,139 @@ public final class PageSubscriptionImpl implements
PageSubscription {
// Each CursorIterator will record their current PageReader in this map
private final ConcurrentLongHashMap<PageReader> pageReaders = new
ConcurrentLongHashMap<>();
+ private final AtomicInteger scheduledScanCount = new AtomicInteger(0);
+
+ private final LinkedList<PageScan> scanList = new LinkedList();
+
+ private static class PageScan {
+ final Comparable<PagedReference> scanFunction;
+ final Runnable found;
+ final Runnable notfound;
+
+ public Comparable<PagedReference> getScanFunction() {
+ return scanFunction;
+ }
+
+ public Runnable getFound() {
+ return found;
+ }
+
+ public Runnable getNotfound() {
+ return notfound;
+ }
+
+ PageScan(Comparable<PagedReference> scanFunction, Runnable found,
Runnable notfound) {
+ this.scanFunction = scanFunction;
+ this.found = found;
+ this.notfound = notfound;
+ }
+ }
+
+ @Override
+ public void addScanAck(Comparable<PagedReference> scanFunction, Runnable
found, Runnable notfound) {
+ PageScan scan = new PageScan(scanFunction, found, notfound);
+ synchronized (scanList) {
+ scanList.add(scan);
+ }
+ }
+
+ @Override
+ public void performScanAck() {
+ // we should only have a max of 2 scheduled tasks
+ // one that's might still be currently running, and another one lined up
+ // no need for more than that
+ if (scheduledScanCount.incrementAndGet() < 2) {
+ executor.execute(this::actualScanAck);
+ } else {
+ scheduledScanCount.decrementAndGet();
+ }
+ }
+
+ private void actualScanAck() {
+ try {
+ PageScan[] localScanList;
+ synchronized (scanList) {
+ if (scanList.size() == 0) {
+ return;
+ }
+ localScanList = scanList.stream().toArray(i -> new PageScan[i]);
+ scanList.clear();
+ }
+
+ LinkedList<Runnable> afterCommitList = new LinkedList<>();
+ TransactionImpl tx = new TransactionImpl(store);
+ tx.addOperation(new TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ for (Runnable r : afterCommitList) {
+ try {
+ r.run();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+ });
+ PageIterator iterator = this.iterator(true);
+ try {
+ while (iterator.hasNext()) {
+ PagedReference reference = iterator.next();
+ boolean keepMoving = false;
+ for (int i = 0; i < localScanList.length; i++) {
+ PageScan scanElemen = localScanList[i];
+ if (scanElemen == null) {
+ continue;
+ }
+
+ int result = scanElemen.scanFunction.compareTo(reference);
+
+ if (result >= 0) {
+ if (result == 0) {
+ try {
+ PageSubscriptionImpl.this.ackTx(tx, reference);
+ if (scanElemen.found != null) {
+ afterCommitList.add(scanElemen.found);
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ } else {
+ if (scanElemen.notfound != null) {
+ scanElemen.notfound.run();
+ }
+ }
+ localScanList[i] = null;
+ } else {
+ keepMoving = true;
+ }
+ }
+
+ if (!keepMoving) {
+ break;
+ }
+ }
+ } finally {
+ iterator.close();
+ }
+
+ for (int i = 0; i < localScanList.length; i++) {
+ if (localScanList[i] != null && localScanList[i].notfound != null)
{
+ localScanList[i].notfound.run();
+ }
+ localScanList[i] = null;
+ }
+
+ if (afterCommitList.size() > 0) {
+ try {
+ tx.commit();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ } finally {
+ scheduledScanCount.decrementAndGet();
+ }
+ }
PageSubscriptionImpl(final PageCursorProvider cursorProvider,
final PagingStore pageStore,
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperationAbstract.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperationAbstract.java
index 0deb3c0..f45f8d2 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperationAbstract.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperationAbstract.java
@@ -26,6 +26,15 @@ import
org.apache.activemq.artemis.core.server.MessageReference;
*/
public abstract class TransactionOperationAbstract implements
TransactionOperation {
+ public static TransactionOperationAbstract afterCommit(Runnable run) {
+ return new TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ run.run();
+ }
+ };
+ }
+
@Override
public void beforePrepare(Transaction tx) throws Exception {
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index a6fdce9..a3c1971 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1767,10 +1767,15 @@ public abstract class ActiveMQTestBase extends Assert {
* @throws Exception
*/
protected HashMap<Integer, AtomicInteger> countJournal(Configuration
config) throws Exception {
+ File location = config.getJournalLocation();
+ return countJournal(location, config.getJournalFileSize(),
config.getJournalMinFiles(), config.getJournalPoolFiles());
+ }
+
+ protected HashMap<Integer, AtomicInteger> countJournal(File location, int
journalFileSize, int minFiles, int poolfiles) throws Exception {
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<>();
- SequentialFileFactory messagesFF = new
NIOSequentialFileFactory(config.getJournalLocation(), null, 1);
+ SequentialFileFactory messagesFF = new
NIOSequentialFileFactory(location, null, 1);
- JournalImpl messagesJournal = new
JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(),
config.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
+ JournalImpl messagesJournal = new JournalImpl(journalFileSize, minFiles,
poolfiles, 0, 0, messagesFF, "activemq-data", "amq", 1);
List<JournalFile> filesToRead = messagesJournal.orderFiles();
for (JournalFile file : filesToRead) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index a2f3215..b024235 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -25,9 +25,9 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-import javax.jms.TextMessage;
import java.net.URI;
+import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@@ -640,6 +640,8 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
server_2.startBrokerConnection(brokerConnectionName);
}
+ snfreplica = server_2.locateQueue(replica.getMirrorSNF());
+
if (pagingTarget) {
assertTrue(queueOnServer1.getPagingStore().isPaging());
}
@@ -647,10 +649,13 @@ public class AMQPReplicaTest extends
AmqpClientTestSupport {
if (acks) {
consumeMessages(largeMessage, 0, NUMBER_OF_MESSAGES / 2 - 1,
AMQP_PORT_2, false);
// Replica is async, so we need to wait acks to arrive before we
finish consuming there
+ Wait.assertEquals(0, snfreplica::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES / 2,
queueOnServer1::getMessageCount);
// we consume on replica, as half the messages were acked
consumeMessages(largeMessage, NUMBER_OF_MESSAGES / 2,
NUMBER_OF_MESSAGES - 1, AMQP_PORT, true); // We consume on both servers as this
is currently replicated
+ Wait.assertEquals(0, snfreplica::getMessageCount);
consumeMessages(largeMessage, NUMBER_OF_MESSAGES / 2,
NUMBER_OF_MESSAGES - 1, AMQP_PORT_2, false);
+ Wait.assertEquals(0, snfreplica::getMessageCount);
if (largeMessage) {
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(),
0);
@@ -658,7 +663,9 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
} else {
consumeMessages(largeMessage, 0, NUMBER_OF_MESSAGES - 1, AMQP_PORT_2,
true);
+ Wait.assertEquals(0, snfreplica::getMessageCount);
consumeMessages(largeMessage, 0, NUMBER_OF_MESSAGES - 1, AMQP_PORT,
true);
+ Wait.assertEquals(0, snfreplica::getMessageCount);
if (largeMessage) {
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(),
0);
validateNoFilesOnLargeDir(server_2.getConfiguration().getLargeMessagesDirectory(),
0); // we kept half of the messages
@@ -845,23 +852,31 @@ public class AMQPReplicaTest extends
AmqpClientTestSupport {
int LAST_ID,
int port,
boolean assertNull) throws JMSException {
- ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + port);
+ ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP",
"tcp://localhost:" + port + "?jms.prefetchPolicy.all=0");
Connection conn = cf.createConnection();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
+ HashSet<Integer> idsReceived = new HashSet<>();
+
MessageConsumer consumer =
sess.createConsumer(sess.createQueue(getQueueName()));
for (int i = START_ID; i <= LAST_ID; i++) {
Message message = consumer.receive(3000);
Assert.assertNotNull(message);
- Assert.assertEquals(i, message.getIntProperty("i"));
- if (message instanceof TextMessage) {
- Assert.assertEquals(getText(largeMessage, i), ((TextMessage)
message).getText());
- }
+ Integer id = message.getIntProperty("i");
+ Assert.assertNotNull(id);
+ Assert.assertTrue(idsReceived.add(id));
}
+
if (assertNull) {
Assert.assertNull(consumer.receiveNoWait());
}
+
+ for (int i = START_ID; i <= LAST_ID; i++) {
+ Assert.assertTrue(idsReceived.remove(i));
+ }
+
+ Assert.assertTrue(idsReceived.isEmpty());
conn.close();
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.java
new file mode 100644
index 0000000..c894bf9
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PagedMirrorTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
Logger.getLogger(PagedMirrorTest.class);
+ ActiveMQServer server1;
+
+ ActiveMQServer server2;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ server1 = createServer(true, createDefaultConfig(0, true), 1024, 10 *
1024);
+ server1.getConfiguration().getAcceptorConfigurations().clear();
+ server1.getConfiguration().addAcceptorConfiguration("server",
"tcp://localhost:61616");
+ AMQPBrokerConnectConfiguration brokerConnectConfiguration = new
AMQPBrokerConnectConfiguration("other",
"tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(1000);
+ brokerConnectConfiguration.addElement(new
AMQPMirrorBrokerConnectionElement());
+ server1.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
+
+ server2 = createServer(true, createDefaultConfig(1, true), 1024, 10 *
1024);
+ server2.getConfiguration().getAcceptorConfigurations().clear();
+ server2.getConfiguration().addAcceptorConfiguration("server",
"tcp://localhost:61617");
+ brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other",
"tcp://localhost:61616").setReconnectAttempts(-1).setRetryInterval(1000);
+ brokerConnectConfiguration.addElement(new
AMQPMirrorBrokerConnectionElement());
+ server2.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
+
+ server1.start();
+ server2.start();
+ }
+
+ @Test
+ public void testPaged() throws Throwable {
+ String sendURI = "tcp://localhost:61616";
+ String consumeURI = "tcp://localhost:61616";
+ String secondConsumeURI = "tcp://localhost:61617";
+
+ Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other")
!= null);
+
+ org.apache.activemq.artemis.core.server.Queue snf1 =
server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
+ Assert.assertNotNull(snf1);
+
+ org.apache.activemq.artemis.core.server.Queue snf2 =
server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
+ Assert.assertNotNull(snf2);
+
+ File countJournalLocation =
server1.getConfiguration().getJournalLocation();
+ Assert.assertTrue(countJournalLocation.exists() &&
countJournalLocation.isDirectory());
+ String protocol = "amqp";
+
+ ConnectionFactory sendCF = CFUtil.createConnectionFactory(protocol,
sendURI);
+ ConnectionFactory consumeCF = CFUtil.createConnectionFactory(protocol,
consumeURI);
+ ConnectionFactory secondConsumeCF =
CFUtil.createConnectionFactory(protocol, secondConsumeURI);
+
+ String bodyBuffer;
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < 1024; i++) {
+ buffer.append("*");
+ }
+ bodyBuffer = buffer.toString();
+ }
+
+ int NUMBER_OF_MESSAGES = 200;
+ int ACK_I = 77;
+
+ try (Connection sendConnecton = sendCF.createConnection()) {
+ Session sendSession = sendConnecton.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue jmsQueue = sendSession.createQueue("someQueue");
+ MessageProducer producer = sendSession.createProducer(jmsQueue);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage message = sendSession.createTextMessage(bodyBuffer);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ }
+ sendSession.commit();
+ }
+
+ Wait.assertEquals(0, snf1::getMessageCount);
+ Wait.assertEquals(0, snf2::getMessageCount);
+
+ try (Connection consumeConnection = consumeCF.createConnection()) {
+ Session consumeSession = consumeConnection.createSession(false, 101);
// individual ack
+ Queue jmsQueue = consumeSession.createQueue("someQueue");
+ MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
+ consumeConnection.start();
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage message = (TextMessage) consumer.receive(6000);
+ if (message.getIntProperty("i") == ACK_I) {
+ message.acknowledge();
+ }
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+ Wait.assertEquals(0, snf1::getMessageCount);
+ Wait.assertEquals(0, snf2::getMessageCount);
+ Wait.assertEquals(1, () -> acksCount(countJournalLocation), 5000, 1000);
+
+ try (Connection consumeConnection = secondConsumeCF.createConnection()) {
+ Session consumeSession = consumeConnection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue jmsQueue = consumeSession.createQueue("someQueue");
+ MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
+ consumeConnection.start();
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES - 1; i++) {
+ TextMessage message = (TextMessage) consumer.receive(6000);
+ Assert.assertNotNull(message);
+ Assert.assertNotEquals(ACK_I, message.getIntProperty("i"));
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+ }
+
+ private int acksCount(File countJournalLocation) throws Exception {
+ HashMap<Integer, AtomicInteger> countJournal =
countJournal(countJournalLocation, 10485760, 2, 2);
+ AtomicInteger acksCount =
countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR);
+ return acksCount != null ? acksCount.get() : 0;
+ }
+
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/IndividualAckPagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/IndividualAckPagingTest.java
new file mode 100644
index 0000000..cd8bd20
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/IndividualAckPagingTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class IndividualAckPagingTest extends ActiveMQTestBase {
+ private static final Logger logger =
Logger.getLogger(IndividualAckPagingTest.class);
+
+ // Even though the focus of the test is paging, I'm adding non paging here
to verify the test semantics itself
+ @Parameterized.Parameters(name = "paging={0},
restartServerBeforeConsume={1}")
+ public static Collection getParams() {
+ return Arrays.asList(new Object[][]{{true, false}, {true, true}, {false,
false}});
+ }
+
+ protected final boolean paging;
+ protected final boolean restartServerBeforeConsume;
+
+ private static final String ADDRESS = "IndividualAckPagingTest";
+
+ ActiveMQServer server;
+
+ protected static final int PAGE_MAX = 10 * 1024;
+
+ protected static final int PAGE_SIZE = 5 * 1024;
+
+ public IndividualAckPagingTest(boolean paging, boolean
restartServerBeforeConsume) {
+ this.paging = paging;
+ this.restartServerBeforeConsume = restartServerBeforeConsume;
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ Configuration config = createDefaultConfig(0,
true).setJournalSyncNonTransactional(false);
+
+ config.setMessageExpiryScanPeriod(-1);
+ if (paging) {
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
+ server.getAddressSettingsRepository().clear();
+ AddressSettings defaultSetting = new
AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+ } else {
+ server = createServer(true, config, 10 * 1024 * 1024, -1);
+ server.getAddressSettingsRepository().clear();
+ AddressSettings defaultSetting = new
AddressSettings().setPageSizeBytes(10 * 1024 *
1024).setMaxSizeBytes(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+ }
+
+
+ server.start();
+
+
+ server.addAddressInfo(new
AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new
QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
+
+ }
+
+ @Test
+ public void testIndividualAckCore() throws Exception {
+ testIndividualAck("CORE", 1024);
+ }
+
+ @Test
+ public void testIndividualAckAMQP() throws Exception {
+ testIndividualAck("AMQP", 1024);
+ }
+
+
+ public void testIndividualAck(String protocol, int bodySize) throws
Exception {
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
+
+ String extraBody;
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < bodySize; i++) {
+ buffer.append("*");
+ }
+ extraBody = buffer.toString();
+ }
+
+ Queue queue = server.locateQueue(ADDRESS);
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
+ MessageProducer producer = session.createProducer(jmsQueue);
+ for (int i = 0; i < 100; i++) {
+ TextMessage message = session.createTextMessage(extraBody);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ }
+ session.commit();
+ }
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false, 101); //
INDIVIDUAL-ACK.. same constant for AMQP and CORE
+ javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
+ connection.start();
+ MessageConsumer consumer = session.createConsumer(jmsQueue);
+ for (int i = 0; i < 100; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ if (message.getIntProperty("i") == 77) {
+ message.acknowledge();
+ }
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+
+ if (restartServerBeforeConsume) {
+ server.stop();
+ server.start();
+ }
+
+ try (Connection connection = factory.createConnection()) {
+ Session session;
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
+ connection.start();
+ MessageConsumer consumer = session.createConsumer(jmsQueue);
+ for (int i = 0; i < 99; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertNotEquals(77, message.getIntProperty("i"));
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+ }
+
+}
\ No newline at end of file
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageAckScanTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageAckScanTest.java
new file mode 100644
index 0000000..2b907d9
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageAckScanTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PageAckScanTest extends ActiveMQTestBase {
+
+ private static final Logger logger =
Logger.getLogger(PageAckScanTest.class);
+
+ private static final String ADDRESS = "MessagesExpiredPagingTest";
+
+ ActiveMQServer server;
+
+ protected static final int PAGE_MAX = 10 * 1024;
+
+ protected static final int PAGE_SIZE = 1 * 1024;
+
+
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ Configuration config = createDefaultConfig(0,
true).setJournalSyncNonTransactional(false);
+
+ config.setMessageExpiryScanPeriod(-1);
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
+
+ server.getAddressSettingsRepository().clear();
+
+ AddressSettings defaultSetting = new
AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+
+ server.start();
+
+
+ server.addAddressInfo(new
AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new
QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
+
+ }
+
+ @Test
+ public void testScanCore() throws Exception {
+ testScan("CORE", 5000, 1000, 100, 1024);
+ }
+
+ @Test
+ public void testScanAMQP() throws Exception {
+ testScan("AMQP", 5000, 1000, 100, 1024);
+ }
+
+
+ public void testScan(String protocol, int numberOfMessages, int
numberOfMessageSecondWave, int pagingInterval, int bodySize) throws Exception {
+ ConnectionFactory factory = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
+
+ String extraBody;
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < bodySize; i++) {
+ buffer.append("*");
+ }
+ extraBody = buffer.toString();
+ }
+
+ Queue queue = server.locateQueue(ADDRESS);
+ queue.getPagingStore().startPaging();
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
+ MessageProducer producer = session.createProducer(jmsQueue);
+ for (int i = 0; i < 20; i++) {
+ TextMessage message = session.createTextMessage(extraBody);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ }
+ session.commit();
+ }
+
+ queue.forEach((r) -> System.out.println("ref -> " +
r.getMessage().getIntProperty("i")));
+
+ AtomicInteger errors = new AtomicInteger(0);
+ ReusableLatch latch = new ReusableLatch(4);
+ Runnable done = latch::countDown;
+ Runnable notFound = () -> {
+ errors.incrementAndGet();
+ done.run();
+ };
+ PageSubscription subscription = queue.getPageSubscription();
+ subscription.addScanAck(new CompareI(15), done, notFound);
+ subscription.addScanAck(new CompareI(11), done, notFound);
+ subscription.addScanAck(new CompareI(99), done, notFound);
+ subscription.addScanAck(new CompareI(-30), done, notFound);
+ System.out.println("Performing scan...");
+ subscription.performScanAck();
+ Assert.assertTrue(latch.await(5, TimeUnit.MINUTES));
+ Assert.assertEquals(2, errors.get());
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
+ connection.start();
+ MessageConsumer consumer = session.createConsumer(jmsQueue);
+ for (int i = 0; i < 18; i++) {
+ TextMessage message = (TextMessage) consumer.receive(5000);
+ Assert.assertNotNull(message);
+ Assert.assertTrue(message.getIntProperty("i") != 11 &&
message.getIntProperty("i") != 15);
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+ }
+
+ // Errorprone would barf at this. it was really intended
+ @SuppressWarnings("ComparableType")
+ class CompareI implements Comparable<PagedReference> {
+ final int i;
+ CompareI(int i) {
+ this.i = i;
+ }
+
+ @Override
+ public int compareTo(PagedReference ref) {
+ System.out.println("Comparing against " +
ref.getMessage().getIntProperty("i"));
+ return ref.getMessage().getIntProperty("i").intValue() - i;
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 1199697..5ec2737 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -1017,6 +1017,38 @@
</execution>
<execution>
<phase>test-compile</phase>
+ <id>create-brokerConnection-paged-serverA</id>
+ <goals>
+ <goal>create</goal>
+ </goals>
+ <configuration>
+ <role>amq</role>
+ <user>artemis</user>
+ <password>artemis</password>
+ <allowAnonymous>true</allowAnonymous>
+ <noWeb>true</noWeb>
+
<instance>${basedir}/target/brokerConnect/pagedA</instance>
+
<configuration>${basedir}/target/classes/servers/brokerConnect/pagedA</configuration>
+ </configuration>
+ </execution>
+ <execution>
+ <phase>test-compile</phase>
+ <id>create-brokerConnection-paged-serverB</id>
+ <goals>
+ <goal>create</goal>
+ </goals>
+ <configuration>
+ <role>amq</role>
+ <user>artemis</user>
+ <password>artemis</password>
+ <allowAnonymous>true</allowAnonymous>
+ <noWeb>true</noWeb>
+
<instance>${basedir}/target/brokerConnect/pagedB</instance>
+
<configuration>${basedir}/target/classes/servers/brokerConnect/pagedB</configuration>
+ </configuration>
+ </execution>
+ <execution>
+ <phase>test-compile</phase>
<id>create-qdr</id>
<goals>
<goal>create</goal>
diff --git
a/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedA/broker.xml
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedA/broker.xml
new file mode 100644
index 0000000..fd6d99e
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedA/broker.xml
@@ -0,0 +1,215 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>ServerA</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO, MAPPED, NIO
+ ASYNCIO: Linux Libaio
+ MAPPED: mmap files
+ NIO: Plain Java Files
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>data/paging</paging-directory>
+
+ <bindings-directory>data/bindings</bindings-directory>
+
+ <journal-directory>data/journal</journal-directory>
+
+ <large-messages-directory>data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>10</journal-pool-files>
+
+ <journal-device-block-size>4096</journal-device-block-size>
+
+ <journal-file-size>10M</journal-file-size>
+
+ <!--
+ This value was determined through a calculation.
+ Your system could perform 25 writes per millisecond
+ on the current journal configuration.
+ That translates as a sync write every 40000 nanoseconds.
+
+ Note: If you specify 0 the system will perform writes directly to the
disk.
+ We recommend this to be 0 if you are using journalType=MAPPED and
journal-datasync=false.
+ -->
+ <journal-buffer-timeout>40000</journal-buffer-timeout>
+
+
+ <!--
+ When using ASYNCIO, this will determine the writing queue depth for
libaio.
+ -->
+ <journal-max-io>1</journal-max-io>
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- should the broker detect dead locks and other issues -->
+ <critical-analyzer>true</critical-analyzer>
+
+ <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+ <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+ <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+
+ <page-sync-timeout>40000</page-sync-timeout>
+
+
+ <acceptors>
+ <acceptor
name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
+ </acceptors>
+
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61617" name="outgoing"
reconnect-attempts="-1" retry-interval="100" user="B" password="B">
+ <mirror durable="true"/>
+ </amqp-connection>
+ </broker-connections>
+
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="amq"/>
+ <permission type="deleteNonDurableQueue" roles="amq"/>
+ <permission type="createDurableQueue" roles="amq"/>
+ <permission type="deleteDurableQueue" roles="amq"/>
+ <permission type="createAddress" roles="amq"/>
+ <permission type="deleteAddress" roles="amq"/>
+ <permission type="consume" roles="amq"/>
+ <permission type="browse" roles="amq"/>
+ <permission type="send" roles="amq"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="amq"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ <auto-create-jms-queues>true</auto-create-jms-queues>
+ <auto-create-jms-topics>true</auto-create-jms-topics>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <max-size-bytes>100000</max-size-bytes>
+ <page-size-bytes>10000</page-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ <auto-create-jms-queues>true</auto-create-jms-queues>
+ <auto-create-jms-topics>true</auto-create-jms-topics>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ <address name="someQueue">
+ <anycast>
+ <queue name="someQueue" />
+ </anycast>
+ </address>
+ </addresses>
+
+
+ <!-- Uncomment the following if you want to use the Standard
LoggingActiveMQServerPlugin pluging to log in events
+ <broker-plugins>
+ <broker-plugin
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+ <property key="LOG_ALL_EVENTS" value="true"/>
+ <property key="LOG_CONNECTION_EVENTS" value="true"/>
+ <property key="LOG_SESSION_EVENTS" value="true"/>
+ <property key="LOG_CONSUMER_EVENTS" value="true"/>
+ <property key="LOG_DELIVERING_EVENTS" value="true"/>
+ <property key="LOG_SENDING_EVENTS" value="true"/>
+ <property key="LOG_INTERNAL_EVENTS" value="true"/>
+ </broker-plugin>
+ </broker-plugins>
+ -->
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedA/logging.properties
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedA/logging.properties
new file mode 100644
index 0000000..810329f
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedA/logging.properties
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Additional logger names to configure (root logger is always configured)
+# Root logger option
+loggers=org.eclipse.jetty,org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.utils.critical,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms.server,org.apache.activemq.artemis.integration.bootstrap,org.apache.activemq.audit.base,org.apache.activemq.audit.message,org.apache.activemq.audit.resource,org.apache.activemq.artemis.utils.pools,org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirro
[...]
+
+# Special logger to debug mirror Security
+
+# Root logger level
+logger.level=INFO
+# ActiveMQ Artemis logger levels
+
+# These levels are candidates to eventually debug
+logger.org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.level=INFO
+
+# if you have issues with CriticalAnalyzer, setting this as TRACE would give
you extra troubleshooting information.
+# but do not use it regularly as it would incur in some extra CPU usage for
this diagnostic.
+logger.org.apache.activemq.artemis.utils.critical.level=INFO
+
+logger.org.eclipse.jetty.level=WARN
+# Root logger handlers
+logger.handlers=FILE,CONSOLE
+
+# to enable audit change the level to INFO
+logger.org.apache.activemq.audit.base.level=ERROR
+logger.org.apache.activemq.audit.base.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.base.useParentHandlers=false
+
+logger.org.apache.activemq.audit.resource.level=ERROR
+logger.org.apache.activemq.audit.resource.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.resource.useParentHandlers=false
+
+logger.org.apache.activemq.audit.message.level=ERROR
+logger.org.apache.activemq.audit.message.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.message.useParentHandlers=false
+
+# Console handler configuration
+handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
+handler.CONSOLE.properties=autoFlush
+handler.CONSOLE.level=TRACE
+handler.CONSOLE.autoFlush=true
+handler.CONSOLE.formatter=PATTERN
+
+# File handler configuration
+handler.FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
+handler.FILE.level=TRACE
+handler.FILE.properties=suffix,append,autoFlush,fileName
+handler.FILE.suffix=.yyyy-MM-dd
+handler.FILE.append=true
+handler.FILE.autoFlush=true
+handler.FILE.fileName=${artemis.instance}/log/artemis.log
+handler.FILE.formatter=PATTERN
+
+# Formatter pattern configuration
+formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
+formatter.PATTERN.properties=pattern
+formatter.PATTERN.pattern=[%t] %d %-5p [%c] %s%E%n
+
+#Audit logger
+handler.AUDIT_FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
+handler.AUDIT_FILE.level=INFO
+handler.AUDIT_FILE.properties=suffix,append,autoFlush,fileName
+handler.AUDIT_FILE.suffix=.yyyy-MM-dd
+handler.AUDIT_FILE.append=true
+handler.AUDIT_FILE.autoFlush=true
+handler.AUDIT_FILE.fileName=${artemis.instance}/log/audit.log
+handler.AUDIT_FILE.formatter=AUDIT_PATTERN
+
+formatter.AUDIT_PATTERN=org.jboss.logmanager.formatters.PatternFormatter
+formatter.AUDIT_PATTERN.properties=pattern
+formatter.AUDIT_PATTERN.pattern=%d [AUDIT](%t) %s%E%n
diff --git
a/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedB/broker.xml
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedB/broker.xml
new file mode 100644
index 0000000..c380d38
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedB/broker.xml
@@ -0,0 +1,215 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:xi="http://www.w3.org/2001/XInclude"
+ xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq:core ">
+
+ <name>ServerB</name>
+
+ <persistence-enabled>true</persistence-enabled>
+
+ <!-- this could be ASYNCIO, MAPPED, NIO
+ ASYNCIO: Linux Libaio
+ MAPPED: mmap files
+ NIO: Plain Java Files
+ -->
+ <journal-type>NIO</journal-type>
+
+ <paging-directory>data/paging</paging-directory>
+
+ <bindings-directory>data/bindings</bindings-directory>
+
+ <journal-directory>data/journal</journal-directory>
+
+ <large-messages-directory>data/large-messages</large-messages-directory>
+
+ <journal-datasync>true</journal-datasync>
+
+ <journal-min-files>2</journal-min-files>
+
+ <journal-pool-files>10</journal-pool-files>
+
+ <journal-device-block-size>4096</journal-device-block-size>
+
+ <journal-file-size>10M</journal-file-size>
+
+ <!--
+ This value was determined through a calculation.
+ Your system could perform 25 writes per millisecond
+ on the current journal configuration.
+ That translates as a sync write every 40000 nanoseconds.
+
+ Note: If you specify 0 the system will perform writes directly to the
disk.
+ We recommend this to be 0 if you are using journalType=MAPPED and
journal-datasync=false.
+ -->
+ <journal-buffer-timeout>40000</journal-buffer-timeout>
+
+
+ <!--
+ When using ASYNCIO, this will determine the writing queue depth for
libaio.
+ -->
+ <journal-max-io>1</journal-max-io>
+ <!--
+ You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
+ <network-check-NIC>theNicName</network-check-NIC>
+ -->
+
+ <!--
+ Use this to use an HTTP server to validate the network
+
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+ <!-- <network-check-period>10000</network-check-period> -->
+ <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+ <!-- this is a comma separated list, no spaces, just DNS or IPs
+ it should accept IPV6
+
+ Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
+ Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
+ You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
+ <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+ <!-- use this to customize the ping used for ipv4 addresses -->
+ <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
+
+ <!-- use this to customize the ping used for ipv6 addresses -->
+ <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
+
+
+
+
+ <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
+ <disk-scan-period>5000</disk-scan-period>
+
+ <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
+ that won't support flow control. -->
+ <max-disk-usage>90</max-disk-usage>
+
+ <!-- should the broker detect dead locks and other issues -->
+ <critical-analyzer>true</critical-analyzer>
+
+ <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+ <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+ <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+
+ <page-sync-timeout>40000</page-sync-timeout>
+
+
+ <acceptors>
+ <acceptor
name="artemis">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
+ </acceptors>
+
+
+ <broker-connections>
+ <amqp-connection uri="tcp://localhost:61616" name="serverA"
user="artemis" password="artemis" retry-interval="100" reconnect-attempts="-1">
+ <mirror durable="true"/>
+ </amqp-connection>
+ </broker-connections>
+
+ <security-settings>
+ <security-setting match="#">
+ <permission type="createNonDurableQueue" roles="amq"/>
+ <permission type="deleteNonDurableQueue" roles="amq"/>
+ <permission type="createDurableQueue" roles="amq"/>
+ <permission type="deleteDurableQueue" roles="amq"/>
+ <permission type="createAddress" roles="amq"/>
+ <permission type="deleteAddress" roles="amq"/>
+ <permission type="consume" roles="amq"/>
+ <permission type="browse" roles="amq"/>
+ <permission type="send" roles="amq"/>
+ <!-- we need this otherwise ./artemis data imp wouldn't work -->
+ <permission type="manage" roles="amq"/>
+ </security-setting>
+ </security-settings>
+
+ <address-settings>
+ <!-- if you define auto-create on certain queues, management has to
be auto-create -->
+ <address-setting match="activemq.management#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <!-- with -1 only the global-max-size is in use for limiting -->
+ <max-size-bytes>-1</max-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ <auto-create-jms-queues>true</auto-create-jms-queues>
+ <auto-create-jms-topics>true</auto-create-jms-topics>
+ </address-setting>
+ <!--default for catch all-->
+ <address-setting match="#">
+ <dead-letter-address>DLQ</dead-letter-address>
+ <expiry-address>ExpiryQueue</expiry-address>
+ <redelivery-delay>0</redelivery-delay>
+ <max-size-bytes>100000</max-size-bytes>
+ <page-size-bytes>10000</page-size-bytes>
+
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+ <address-full-policy>PAGE</address-full-policy>
+ <auto-create-queues>true</auto-create-queues>
+ <auto-create-addresses>true</auto-create-addresses>
+ <auto-create-jms-queues>true</auto-create-jms-queues>
+ <auto-create-jms-topics>true</auto-create-jms-topics>
+ </address-setting>
+ </address-settings>
+
+ <addresses>
+ <address name="DLQ">
+ <anycast>
+ <queue name="DLQ" />
+ </anycast>
+ </address>
+ <address name="ExpiryQueue">
+ <anycast>
+ <queue name="ExpiryQueue" />
+ </anycast>
+ </address>
+ <address name="someQueue">
+ <anycast>
+ <queue name="someQueue" />
+ </anycast>
+ </address>
+ </addresses>
+
+
+ <!-- Uncomment the following if you want to use the Standard
LoggingActiveMQServerPlugin pluging to log in events
+ <broker-plugins>
+ <broker-plugin
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+ <property key="LOG_ALL_EVENTS" value="true"/>
+ <property key="LOG_CONNECTION_EVENTS" value="true"/>
+ <property key="LOG_SESSION_EVENTS" value="true"/>
+ <property key="LOG_CONSUMER_EVENTS" value="true"/>
+ <property key="LOG_DELIVERING_EVENTS" value="true"/>
+ <property key="LOG_SENDING_EVENTS" value="true"/>
+ <property key="LOG_INTERNAL_EVENTS" value="true"/>
+ </broker-plugin>
+ </broker-plugins>
+ -->
+
+ </core>
+</configuration>
diff --git
a/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedB/logging.properties
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedB/logging.properties
new file mode 100644
index 0000000..810329f
--- /dev/null
+++
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedB/logging.properties
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Additional logger names to configure (root logger is always configured)
+# Root logger option
+loggers=org.eclipse.jetty,org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.utils.critical,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms.server,org.apache.activemq.artemis.integration.bootstrap,org.apache.activemq.audit.base,org.apache.activemq.audit.message,org.apache.activemq.audit.resource,org.apache.activemq.artemis.utils.pools,org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirro
[...]
+
+# Special logger to debug mirror Security
+
+# Root logger level
+logger.level=INFO
+# ActiveMQ Artemis logger levels
+
+# These levels are candidates to eventually debug
+logger.org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.level=INFO
+
+# if you have issues with CriticalAnalyzer, setting this as TRACE would give
you extra troubleshooting information.
+# but do not use it regularly as it would incur in some extra CPU usage for
this diagnostic.
+logger.org.apache.activemq.artemis.utils.critical.level=INFO
+
+logger.org.eclipse.jetty.level=WARN
+# Root logger handlers
+logger.handlers=FILE,CONSOLE
+
+# to enable audit change the level to INFO
+logger.org.apache.activemq.audit.base.level=ERROR
+logger.org.apache.activemq.audit.base.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.base.useParentHandlers=false
+
+logger.org.apache.activemq.audit.resource.level=ERROR
+logger.org.apache.activemq.audit.resource.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.resource.useParentHandlers=false
+
+logger.org.apache.activemq.audit.message.level=ERROR
+logger.org.apache.activemq.audit.message.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.message.useParentHandlers=false
+
+# Console handler configuration
+handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
+handler.CONSOLE.properties=autoFlush
+handler.CONSOLE.level=TRACE
+handler.CONSOLE.autoFlush=true
+handler.CONSOLE.formatter=PATTERN
+
+# File handler configuration
+handler.FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
+handler.FILE.level=TRACE
+handler.FILE.properties=suffix,append,autoFlush,fileName
+handler.FILE.suffix=.yyyy-MM-dd
+handler.FILE.append=true
+handler.FILE.autoFlush=true
+handler.FILE.fileName=${artemis.instance}/log/artemis.log
+handler.FILE.formatter=PATTERN
+
+# Formatter pattern configuration
+formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
+formatter.PATTERN.properties=pattern
+formatter.PATTERN.pattern=[%t] %d %-5p [%c] %s%E%n
+
+#Audit logger
+handler.AUDIT_FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
+handler.AUDIT_FILE.level=INFO
+handler.AUDIT_FILE.properties=suffix,append,autoFlush,fileName
+handler.AUDIT_FILE.suffix=.yyyy-MM-dd
+handler.AUDIT_FILE.append=true
+handler.AUDIT_FILE.autoFlush=true
+handler.AUDIT_FILE.fileName=${artemis.instance}/log/audit.log
+handler.AUDIT_FILE.formatter=AUDIT_PATTERN
+
+formatter.AUDIT_PATTERN=org.jboss.logmanager.formatters.PatternFormatter
+formatter.AUDIT_PATTERN.properties=pattern
+formatter.AUDIT_PATTERN.pattern=%d [AUDIT](%t) %s%E%n
diff --git
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/PagedMirrorSmokeTest.java
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/PagedMirrorSmokeTest.java
new file mode 100644
index 0000000..a65cfcb
--- /dev/null
+++
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/PagedMirrorSmokeTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PagedMirrorSmokeTest extends SmokeTestBase {
+
+ // Change this to true to generate a print-data in certain cases on this
test
+ private static final boolean PRINT_DATA = false;
+
+ private static final Logger logger =
Logger.getLogger(PagedMirrorSmokeTest.class);
+
+ public static final String SERVER_NAME_A = "brokerConnect/pagedA";
+ public static final String SERVER_NAME_B = "brokerConnect/pagedB";
+
+ Process processB;
+ Process processA;
+
+ @Before
+ public void beforeClass() throws Exception {
+ cleanupData(SERVER_NAME_A);
+ cleanupData(SERVER_NAME_B);
+ processB = startServer(SERVER_NAME_B, 1, 0);
+ processA = startServer(SERVER_NAME_A, 0, 0);
+
+ ServerUtil.waitForServerToStart(1, "B", "B", 30000);
+ ServerUtil.waitForServerToStart(0, "A", "A", 30000);
+ }
+
+ @Test
+ public void testPaged() throws Throwable {
+ String sendURI = "tcp://localhost:61616";
+ String consumeURI = "tcp://localhost:61616";
+ String secondConsumeURI = "tcp://localhost:61617";
+
+ File countJournalLocation = new File(getServerLocation(SERVER_NAME_A),
"data/journal");
+ File countJournalLocationB = new File(getServerLocation(SERVER_NAME_B),
"data/journal");
+ Assert.assertTrue(countJournalLocation.exists() &&
countJournalLocation.isDirectory());
+ String protocol = "amqp";
+
+ ConnectionFactory sendCF = CFUtil.createConnectionFactory(protocol,
sendURI);
+ ConnectionFactory consumeCF = CFUtil.createConnectionFactory(protocol,
consumeURI);
+ ConnectionFactory secondConsumeCF =
CFUtil.createConnectionFactory(protocol, secondConsumeURI);
+
+ String bodyBuffer;
+ {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < 1024; i++) {
+ buffer.append("*");
+ }
+ bodyBuffer = buffer.toString();
+ }
+
+ int NUMBER_OF_MESSAGES = 200;
+ int ACK_I = 77;
+
+ try (Connection sendConnecton = sendCF.createConnection()) {
+ Session sendSession = sendConnecton.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue jmsQueue = sendSession.createQueue("someQueue");
+ MessageProducer producer = sendSession.createProducer(jmsQueue);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage message = sendSession.createTextMessage(bodyBuffer);
+ message.setIntProperty("i", i);
+ producer.send(message);
+ }
+ sendSession.commit();
+ }
+
+ Thread.sleep(500);
+ try (Connection consumeConnection = consumeCF.createConnection()) {
+ Session consumeSession = consumeConnection.createSession(false, 101);
// individual ack
+ Queue jmsQueue = consumeSession.createQueue("someQueue");
+ MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
+ consumeConnection.start();
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ TextMessage message = (TextMessage) consumer.receive(6000);
+ if (message.getIntProperty("i") == ACK_I) {
+ message.acknowledge();
+ }
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+ Wait.assertEquals(1, () -> acksCount(countJournalLocation), 5000, 1000);
+ Wait.assertEquals(1, () -> acksCount(countJournalLocationB), 5000, 1000);
+
+ try (Connection consumeConnection = secondConsumeCF.createConnection()) {
+ Session consumeSession = consumeConnection.createSession(true,
Session.SESSION_TRANSACTED);
+ Queue jmsQueue = consumeSession.createQueue("someQueue");
+ MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
+ consumeConnection.start();
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES - 1; i++) {
+ TextMessage message = (TextMessage) consumer.receive(6000);
+ Assert.assertNotNull(message);
+ Assert.assertNotEquals(ACK_I, message.getIntProperty("i"));
+ }
+ Assert.assertNull(consumer.receiveNoWait());
+ }
+ }
+
+ private int acksCount(File countJournalLocation) throws Exception {
+ HashMap<Integer, AtomicInteger> countJournal =
countJournal(countJournalLocation, 10485760, 2, 2);
+ AtomicInteger acksCount =
countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR);
+ return acksCount != null ? acksCount.get() : 0;
+ }
+
+}