This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 7fb4f80  ARTEMIS-3464 Missing ACKs on Page and Mirror
     new 9de2888  This closes #3728
7fb4f80 is described below

commit 7fb4f806495096439a2a8a9f33a5d01a535b82ab
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Aug 30 17:09:17 2021 -0400

    ARTEMIS-3464 Missing ACKs on Page and Mirror
---
 .../amqp/connect/AMQPBrokerConnection.java         |   4 +
 .../connect/mirror/AMQPMirrorControllerTarget.java |  73 ++++++-
 .../amqp/connect/mirror/ReferenceNodeStore.java    |   5 +
 .../core/paging/cursor/PageSubscription.java       |  14 +-
 .../paging/cursor/impl/PageSubscriptionImpl.java   | 133 +++++++++++++
 .../transaction/TransactionOperationAbstract.java  |   9 +
 .../artemis/tests/util/ActiveMQTestBase.java       |   9 +-
 .../integration/amqp/connect/AMQPReplicaTest.java  |  27 ++-
 .../integration/amqp/connect/PagedMirrorTest.java  | 161 +++++++++++++++
 .../paging/IndividualAckPagingTest.java            | 172 +++++++++++++++++
 .../tests/integration/paging/PageAckScanTest.java  | 171 ++++++++++++++++
 tests/smoke-tests/pom.xml                          |  32 +++
 .../servers/brokerConnect/pagedA/broker.xml        | 215 +++++++++++++++++++++
 .../brokerConnect/pagedA/logging.properties        |  86 +++++++++
 .../servers/brokerConnect/pagedB/broker.xml        | 215 +++++++++++++++++++++
 .../brokerConnect/pagedB/logging.properties        |  86 +++++++++
 .../brokerConnection/PagedMirrorSmokeTest.java     | 143 ++++++++++++++
 17 files changed, 1535 insertions(+), 20 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
index 3d38eac..0cfcd4b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java
@@ -393,6 +393,10 @@ public class AMQPBrokerConnection implements 
ClientConnectionLifeCycleListener,
          mirrorControlQueue = server.createQueue(new 
QueueConfiguration(getMirrorSNF(replicaConfig)).setAddress(getMirrorSNF(replicaConfig)).setRoutingType(RoutingType.ANYCAST).setDurable(replicaConfig.isDurable()).setInternal(true),
 true);
       }
 
+      if (logger.isDebugEnabled()) {
+         logger.debug("Mirror queue " + mirrorControlQueue.getName());
+      }
+
       mirrorControlQueue.setMirrorController(true);
 
       QueueBinding snfReplicaQueueBinding = 
(QueueBinding)server.getPostOffice().getBinding(getMirrorSNF(replicaConfig));
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 978c166..7f59ef7 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -45,7 +46,6 @@ import 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonAbstractReceiver;
 import org.apache.activemq.artemis.utils.ByteUtil;
-import org.apache.activemq.artemis.utils.collections.NodeStore;
 import org.apache.activemq.artemis.utils.pools.MpscPool;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
@@ -95,7 +95,7 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
       public TransactionOperationAbstract tx = new 
TransactionOperationAbstract() {
          @Override
          public void afterCommit(Transaction tx) {
-            completeOperation();
+            connectionRun();
          }
       };
 
@@ -121,10 +121,10 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
 
       @Override
       public void done() {
-         completeOperation();
+         connectionRun();
       }
 
-      private void completeOperation() {
+      public void connectionRun() {
          connection.runNow(ACKMessageOperation.this);
       }
 
@@ -146,7 +146,7 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
    DuplicateIDCache lruduplicateIDCache;
    String lruDuplicateIDKey;
 
-   private final NodeStore<MessageReference> referenceNodeStore;
+   private final ReferenceNodeStore referenceNodeStore;
 
    public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
                                      AMQPConnectionContext connection,
@@ -354,6 +354,20 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
 
    }
 
+   private void performAckOnPage(String nodeID, long messageID, Queue 
targetQueue, ACKMessageOperation ackMessageOperation) {
+      if (targetQueue.getPagingStore().isPaging()) {
+         PageAck pageAck = new PageAck(nodeID, messageID, ackMessageOperation);
+         targetQueue.getPageSubscription().addScanAck(pageAck, pageAck, 
pageAck);
+         targetQueue.getPageSubscription().performScanAck();
+      } else {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Post ack Server " + server + " could not find 
messageID = " + messageID +
+                            " representing nodeID=" + nodeID);
+         }
+         
OperationContextImpl.getContext().executeOnCompletion(ackMessageOperation);
+      }
+   }
+
    private void performAck(String nodeID, long messageID, Queue targetQueue, 
ACKMessageOperation ackMessageOperation, boolean retry) {
       if (logger.isTraceEnabled()) {
          logger.trace("performAck (nodeID=" + nodeID + ", messageID=" + 
messageID + ")" + ", targetQueue=" + targetQueue.getName());
@@ -380,10 +394,7 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
             logger.warn(e.getMessage(), e);
          }
       } else {
-         if (logger.isDebugEnabled()) {
-            logger.debug("Post ack Server " + server + " could not find 
messageID = " + messageID +
-                            " representing nodeID=" + nodeID);
-         }
+         performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
       }
 
    }
@@ -476,4 +487,48 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
    public void sendMessage(Message message, RoutingContext context, 
List<MessageReference> refs) {
    }
 
+   // I need a supress warning annotation here
+   // because errorProne is issuing an error her, however I really intend to 
compare PageACK against PagedReference
+   @SuppressWarnings("ComparableType")
+   class PageAck implements Comparable<PagedReference>, Runnable {
+
+      final String nodeID;
+      final long messageID;
+      final ACKMessageOperation operation;
+
+      PageAck(String nodeID, long messageID, ACKMessageOperation operation) {
+         this.nodeID = nodeID;
+         this.messageID = messageID;
+         this.operation = operation;
+      }
+
+      @Override
+      public int compareTo(PagedReference reference) {
+         String refNodeID = referenceNodeStore.getServerID(reference);
+         long refMessageID = referenceNodeStore.getID(reference);
+         if (refNodeID == null) {
+            refNodeID = referenceNodeStore.getDefaultNodeID();
+         }
+
+         if (refNodeID.equals(nodeID)) {
+            long diff = refMessageID - messageID;
+            if (diff == 0) {
+               return 0;
+            } else if (diff > 0) {
+               return 1;
+            } else {
+               return -1;
+            }
+         } else {
+            return -1;
+         }
+      }
+
+      @Override
+      public void run() {
+         operation.connectionRun();
+      }
+
+   }
+
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
index 1633076..61b0f14 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
@@ -42,6 +42,11 @@ public class ReferenceNodeStore implements 
NodeStore<MessageReference> {
    String lruListID;
    LongObjectHashMap<LinkedListImpl.Node<MessageReference>> lruMap;
 
+
+   public String getDefaultNodeID() {
+      return serverID;
+   }
+
    @Override
    public void storeNode(MessageReference element, 
LinkedListImpl.Node<MessageReference> node) {
       String list = getServerID(element);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index eb41e63..4d7bd7b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -85,9 +85,17 @@ public interface PageSubscription {
    // for internal (cursor) classes
    void confirmPosition(Transaction tx, PagePosition position) throws 
Exception;
 
-   /**
-    * @return the first page in use or MAX_LONG if none is in use
-    */
+
+   // Add a scan function to be performed. It will be completed when you call 
performScan
+   void addScanAck(Comparable<PagedReference> scanFunction, Runnable found, 
Runnable notfound);
+
+   // it will schedule a scan on pages for everything that was added through 
addScanAck
+   void performScanAck();
+
+
+      /**
+       * @return the first page in use or MAX_LONG if none is in use
+       */
    long getFirstPage();
 
    // Reload operations
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 1ca83ce..4487786 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -112,6 +112,139 @@ public final class PageSubscriptionImpl implements 
PageSubscription {
 
    // Each CursorIterator will record their current PageReader in this map
    private final ConcurrentLongHashMap<PageReader> pageReaders = new 
ConcurrentLongHashMap<>();
+   private final AtomicInteger scheduledScanCount = new AtomicInteger(0);
+
+   private final LinkedList<PageScan> scanList = new LinkedList();
+
+   private static class PageScan {
+      final Comparable<PagedReference> scanFunction;
+      final Runnable found;
+      final Runnable notfound;
+
+      public Comparable<PagedReference> getScanFunction() {
+         return scanFunction;
+      }
+
+      public Runnable getFound() {
+         return found;
+      }
+
+      public Runnable getNotfound() {
+         return notfound;
+      }
+
+      PageScan(Comparable<PagedReference> scanFunction, Runnable found, 
Runnable notfound) {
+         this.scanFunction = scanFunction;
+         this.found = found;
+         this.notfound = notfound;
+      }
+   }
+
+   @Override
+   public void addScanAck(Comparable<PagedReference> scanFunction, Runnable 
found, Runnable notfound) {
+      PageScan scan = new PageScan(scanFunction, found, notfound);
+      synchronized (scanList) {
+         scanList.add(scan);
+      }
+   }
+
+   @Override
+   public void performScanAck() {
+      // we should only have a max of 2 scheduled tasks
+      // one that's might still be currently running, and another one lined up
+      // no need for more than that
+      if (scheduledScanCount.incrementAndGet() < 2) {
+         executor.execute(this::actualScanAck);
+      } else {
+         scheduledScanCount.decrementAndGet();
+      }
+   }
+
+   private void actualScanAck() {
+      try {
+         PageScan[] localScanList;
+         synchronized (scanList) {
+            if (scanList.size() == 0) {
+               return;
+            }
+            localScanList = scanList.stream().toArray(i -> new PageScan[i]);
+            scanList.clear();
+         }
+
+         LinkedList<Runnable> afterCommitList = new LinkedList<>();
+         TransactionImpl tx = new TransactionImpl(store);
+         tx.addOperation(new TransactionOperationAbstract() {
+            @Override
+            public void afterCommit(Transaction tx) {
+               for (Runnable r : afterCommitList) {
+                  try {
+                     r.run();
+                  } catch (Throwable e) {
+                     logger.warn(e.getMessage(), e);
+                  }
+               }
+            }
+         });
+         PageIterator iterator = this.iterator(true);
+         try {
+            while (iterator.hasNext()) {
+               PagedReference reference = iterator.next();
+               boolean keepMoving = false;
+               for (int i = 0; i < localScanList.length; i++) {
+                  PageScan scanElemen = localScanList[i];
+                  if (scanElemen == null) {
+                     continue;
+                  }
+
+                  int result = scanElemen.scanFunction.compareTo(reference);
+
+                  if (result >= 0) {
+                     if (result == 0) {
+                        try {
+                           PageSubscriptionImpl.this.ackTx(tx, reference);
+                           if (scanElemen.found != null) {
+                              afterCommitList.add(scanElemen.found);
+                           }
+                        } catch (Throwable e) {
+                           logger.warn(e.getMessage(), e);
+                        }
+                     } else {
+                        if (scanElemen.notfound != null) {
+                           scanElemen.notfound.run();
+                        }
+                     }
+                     localScanList[i] = null;
+                  } else {
+                     keepMoving = true;
+                  }
+               }
+
+               if (!keepMoving) {
+                  break;
+               }
+            }
+         } finally {
+            iterator.close();
+         }
+
+         for (int i = 0; i < localScanList.length; i++) {
+            if (localScanList[i] != null && localScanList[i].notfound != null) 
{
+               localScanList[i].notfound.run();
+            }
+            localScanList[i] = null;
+         }
+
+         if (afterCommitList.size() > 0) {
+            try {
+               tx.commit();
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+            }
+         }
+      } finally {
+         scheduledScanCount.decrementAndGet();
+      }
+   }
 
    PageSubscriptionImpl(final PageCursorProvider cursorProvider,
                         final PagingStore pageStore,
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperationAbstract.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperationAbstract.java
index 0deb3c0..f45f8d2 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperationAbstract.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperationAbstract.java
@@ -26,6 +26,15 @@ import 
org.apache.activemq.artemis.core.server.MessageReference;
  */
 public abstract class TransactionOperationAbstract implements 
TransactionOperation {
 
+   public static TransactionOperationAbstract afterCommit(Runnable run) {
+      return new TransactionOperationAbstract() {
+         @Override
+         public void afterCommit(Transaction tx) {
+            run.run();
+         }
+      };
+   }
+
    @Override
    public void beforePrepare(Transaction tx) throws Exception {
 
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index a6fdce9..a3c1971 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -1767,10 +1767,15 @@ public abstract class ActiveMQTestBase extends Assert {
     * @throws Exception
     */
    protected HashMap<Integer, AtomicInteger> countJournal(Configuration 
config) throws Exception {
+      File location = config.getJournalLocation();
+      return countJournal(location, config.getJournalFileSize(), 
config.getJournalMinFiles(), config.getJournalPoolFiles());
+   }
+
+   protected HashMap<Integer, AtomicInteger> countJournal(File location, int 
journalFileSize, int minFiles, int poolfiles) throws Exception {
       final HashMap<Integer, AtomicInteger> recordsType = new HashMap<>();
-      SequentialFileFactory messagesFF = new 
NIOSequentialFileFactory(config.getJournalLocation(), null, 1);
+      SequentialFileFactory messagesFF = new 
NIOSequentialFileFactory(location, null, 1);
 
-      JournalImpl messagesJournal = new 
JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), 
config.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
+      JournalImpl messagesJournal = new JournalImpl(journalFileSize, minFiles, 
poolfiles, 0, 0, messagesFF, "activemq-data", "amq", 1);
       List<JournalFile> filesToRead = messagesJournal.orderFiles();
 
       for (JournalFile file : filesToRead) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index a2f3215..b024235 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -25,9 +25,9 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-import javax.jms.TextMessage;
 
 import java.net.URI;
+import java.util.HashSet;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
@@ -640,6 +640,8 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
          server_2.startBrokerConnection(brokerConnectionName);
       }
 
+      snfreplica = server_2.locateQueue(replica.getMirrorSNF());
+
       if (pagingTarget) {
          assertTrue(queueOnServer1.getPagingStore().isPaging());
       }
@@ -647,10 +649,13 @@ public class AMQPReplicaTest extends 
AmqpClientTestSupport {
       if (acks) {
          consumeMessages(largeMessage, 0, NUMBER_OF_MESSAGES / 2 - 1, 
AMQP_PORT_2, false);
          // Replica is async, so we need to wait acks to arrive before we 
finish consuming there
+         Wait.assertEquals(0, snfreplica::getMessageCount);
          Wait.assertEquals(NUMBER_OF_MESSAGES / 2, 
queueOnServer1::getMessageCount);
          // we consume on replica, as half the messages were acked
          consumeMessages(largeMessage, NUMBER_OF_MESSAGES / 2, 
NUMBER_OF_MESSAGES - 1, AMQP_PORT, true); // We consume on both servers as this 
is currently replicated
+         Wait.assertEquals(0, snfreplica::getMessageCount);
          consumeMessages(largeMessage, NUMBER_OF_MESSAGES / 2, 
NUMBER_OF_MESSAGES - 1, AMQP_PORT_2, false);
+         Wait.assertEquals(0, snfreplica::getMessageCount);
 
          if (largeMessage) {
             
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(),
 0);
@@ -658,7 +663,9 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
       } else {
 
          consumeMessages(largeMessage, 0, NUMBER_OF_MESSAGES - 1, AMQP_PORT_2, 
true);
+         Wait.assertEquals(0, snfreplica::getMessageCount);
          consumeMessages(largeMessage, 0, NUMBER_OF_MESSAGES - 1, AMQP_PORT, 
true);
+         Wait.assertEquals(0, snfreplica::getMessageCount);
          if (largeMessage) {
             
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(),
 0);
             
validateNoFilesOnLargeDir(server_2.getConfiguration().getLargeMessagesDirectory(),
 0); // we kept half of the messages
@@ -845,23 +852,31 @@ public class AMQPReplicaTest extends 
AmqpClientTestSupport {
                                 int LAST_ID,
                                 int port,
                                 boolean assertNull) throws JMSException {
-      ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + port);
+      ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + port + "?jms.prefetchPolicy.all=0");
       Connection conn = cf.createConnection();
       Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
       conn.start();
 
+      HashSet<Integer> idsReceived = new HashSet<>();
+
       MessageConsumer consumer = 
sess.createConsumer(sess.createQueue(getQueueName()));
       for (int i = START_ID; i <= LAST_ID; i++) {
          Message message = consumer.receive(3000);
          Assert.assertNotNull(message);
-         Assert.assertEquals(i, message.getIntProperty("i"));
-         if (message instanceof TextMessage) {
-            Assert.assertEquals(getText(largeMessage, i), ((TextMessage) 
message).getText());
-         }
+         Integer id = message.getIntProperty("i");
+         Assert.assertNotNull(id);
+         Assert.assertTrue(idsReceived.add(id));
       }
+
       if (assertNull) {
          Assert.assertNull(consumer.receiveNoWait());
       }
+
+      for (int i = START_ID; i <= LAST_ID; i++) {
+         Assert.assertTrue(idsReceived.remove(i));
+      }
+
+      Assert.assertTrue(idsReceived.isEmpty());
       conn.close();
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.java
new file mode 100644
index 0000000..c894bf9
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/PagedMirrorTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import 
org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PagedMirrorTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
Logger.getLogger(PagedMirrorTest.class);
+   ActiveMQServer server1;
+
+   ActiveMQServer server2;
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      server1 = createServer(true, createDefaultConfig(0, true), 1024, 10 * 
1024);
+      server1.getConfiguration().getAcceptorConfigurations().clear();
+      server1.getConfiguration().addAcceptorConfiguration("server", 
"tcp://localhost:61616");
+      AMQPBrokerConnectConfiguration brokerConnectConfiguration = new 
AMQPBrokerConnectConfiguration("other", 
"tcp://localhost:61617").setReconnectAttempts(-1).setRetryInterval(1000);
+      brokerConnectConfiguration.addElement(new 
AMQPMirrorBrokerConnectionElement());
+      server1.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
+
+      server2 = createServer(true, createDefaultConfig(1, true), 1024, 10 * 
1024);
+      server2.getConfiguration().getAcceptorConfigurations().clear();
+      server2.getConfiguration().addAcceptorConfiguration("server", 
"tcp://localhost:61617");
+      brokerConnectConfiguration = new AMQPBrokerConnectConfiguration("other", 
"tcp://localhost:61616").setReconnectAttempts(-1).setRetryInterval(1000);
+      brokerConnectConfiguration.addElement(new 
AMQPMirrorBrokerConnectionElement());
+      server2.getConfiguration().addAMQPConnection(brokerConnectConfiguration);
+
+      server1.start();
+      server2.start();
+   }
+
+   @Test
+   public void testPaged() throws Throwable {
+      String sendURI = "tcp://localhost:61616";
+      String consumeURI = "tcp://localhost:61616";
+      String secondConsumeURI = "tcp://localhost:61617";
+
+      Wait.waitFor(() -> server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other") 
!= null);
+
+      org.apache.activemq.artemis.core.server.Queue snf1 = 
server2.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
+      Assert.assertNotNull(snf1);
+
+      org.apache.activemq.artemis.core.server.Queue snf2 = 
server1.locateQueue("$ACTIVEMQ_ARTEMIS_MIRROR_other");
+      Assert.assertNotNull(snf2);
+
+      File countJournalLocation = 
server1.getConfiguration().getJournalLocation();
+      Assert.assertTrue(countJournalLocation.exists() && 
countJournalLocation.isDirectory());
+      String protocol = "amqp";
+
+      ConnectionFactory sendCF = CFUtil.createConnectionFactory(protocol, 
sendURI);
+      ConnectionFactory consumeCF = CFUtil.createConnectionFactory(protocol, 
consumeURI);
+      ConnectionFactory secondConsumeCF = 
CFUtil.createConnectionFactory(protocol, secondConsumeURI);
+
+      String bodyBuffer;
+      {
+         StringBuffer buffer = new StringBuffer();
+         for (int i = 0; i < 1024; i++) {
+            buffer.append("*");
+         }
+         bodyBuffer = buffer.toString();
+      }
+
+      int NUMBER_OF_MESSAGES = 200;
+      int ACK_I = 77;
+
+      try (Connection sendConnecton = sendCF.createConnection()) {
+         Session sendSession = sendConnecton.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue jmsQueue = sendSession.createQueue("someQueue");
+         MessageProducer producer = sendSession.createProducer(jmsQueue);
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            TextMessage message = sendSession.createTextMessage(bodyBuffer);
+            message.setIntProperty("i", i);
+            producer.send(message);
+         }
+         sendSession.commit();
+      }
+
+      Wait.assertEquals(0, snf1::getMessageCount);
+      Wait.assertEquals(0, snf2::getMessageCount);
+
+      try (Connection consumeConnection = consumeCF.createConnection()) {
+         Session consumeSession = consumeConnection.createSession(false, 101); 
// individual ack
+         Queue jmsQueue = consumeSession.createQueue("someQueue");
+         MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
+         consumeConnection.start();
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            TextMessage message = (TextMessage) consumer.receive(6000);
+            if (message.getIntProperty("i") == ACK_I) {
+               message.acknowledge();
+            }
+         }
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+      Wait.assertEquals(0, snf1::getMessageCount);
+      Wait.assertEquals(0, snf2::getMessageCount);
+      Wait.assertEquals(1, () -> acksCount(countJournalLocation), 5000, 1000);
+
+      try (Connection consumeConnection = secondConsumeCF.createConnection()) {
+         Session consumeSession = consumeConnection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue jmsQueue = consumeSession.createQueue("someQueue");
+         MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
+         consumeConnection.start();
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES - 1; i++) {
+            TextMessage message = (TextMessage) consumer.receive(6000);
+            Assert.assertNotNull(message);
+            Assert.assertNotEquals(ACK_I, message.getIntProperty("i"));
+         }
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+   }
+
+   private int acksCount(File countJournalLocation) throws Exception {
+      HashMap<Integer, AtomicInteger> countJournal = 
countJournal(countJournalLocation, 10485760, 2, 2);
+      AtomicInteger acksCount = 
countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR);
+      return acksCount != null ? acksCount.get() : 0;
+   }
+
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/IndividualAckPagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/IndividualAckPagingTest.java
new file mode 100644
index 0000000..cd8bd20
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/IndividualAckPagingTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class IndividualAckPagingTest extends ActiveMQTestBase {
+   private static final Logger logger = 
Logger.getLogger(IndividualAckPagingTest.class);
+
+   // Even though the focus of the test is paging, I'm adding non paging here 
to verify the test semantics itself
+   @Parameterized.Parameters(name = "paging={0}, 
restartServerBeforeConsume={1}")
+   public static Collection getParams() {
+      return Arrays.asList(new Object[][]{{true, false}, {true, true}, {false, 
false}});
+   }
+
+   protected final boolean paging;
+   protected final boolean restartServerBeforeConsume;
+
+   private static final String ADDRESS = "IndividualAckPagingTest";
+
+   ActiveMQServer server;
+
+   protected static final int PAGE_MAX = 10 * 1024;
+
+   protected static final int PAGE_SIZE = 5 * 1024;
+
+   public IndividualAckPagingTest(boolean paging, boolean 
restartServerBeforeConsume) {
+      this.paging = paging;
+      this.restartServerBeforeConsume = restartServerBeforeConsume;
+   }
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      Configuration config = createDefaultConfig(0, 
true).setJournalSyncNonTransactional(false);
+
+      config.setMessageExpiryScanPeriod(-1);
+      if (paging) {
+         server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
+         server.getAddressSettingsRepository().clear();
+         AddressSettings defaultSetting = new 
AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
+         server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+      } else {
+         server = createServer(true, config, 10 * 1024 * 1024, -1);
+         server.getAddressSettingsRepository().clear();
+         AddressSettings defaultSetting = new 
AddressSettings().setPageSizeBytes(10 * 1024 * 
1024).setMaxSizeBytes(-1).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
+         server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+      }
+
+
+      server.start();
+
+
+      server.addAddressInfo(new 
AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
+
+   }
+
+   @Test
+   public void testIndividualAckCore() throws Exception {
+      testIndividualAck("CORE", 1024);
+   }
+
+   @Test
+   public void testIndividualAckAMQP() throws Exception {
+      testIndividualAck("AMQP", 1024);
+   }
+
+
+   public void testIndividualAck(String protocol, int bodySize) throws 
Exception {
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+
+      String extraBody;
+      {
+         StringBuffer buffer = new StringBuffer();
+         for (int i = 0; i < bodySize; i++) {
+            buffer.append("*");
+         }
+         extraBody = buffer.toString();
+      }
+
+      Queue queue = server.locateQueue(ADDRESS);
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
+         MessageProducer producer = session.createProducer(jmsQueue);
+         for (int i = 0; i < 100; i++) {
+            TextMessage message = session.createTextMessage(extraBody);
+            message.setIntProperty("i", i);
+            producer.send(message);
+         }
+         session.commit();
+      }
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, 101); // 
INDIVIDUAL-ACK.. same constant for AMQP and CORE
+         javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
+         connection.start();
+         MessageConsumer consumer = session.createConsumer(jmsQueue);
+         for (int i = 0; i < 100; i++) {
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            if (message.getIntProperty("i") == 77) {
+               message.acknowledge();
+            }
+         }
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+
+      if (restartServerBeforeConsume) {
+         server.stop();
+         server.start();
+      }
+
+      try (Connection connection = factory.createConnection()) {
+         Session session;
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
+         connection.start();
+         MessageConsumer consumer = session.createConsumer(jmsQueue);
+         for (int i = 0; i < 99; i++) {
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertNotEquals(77, message.getIntProperty("i"));
+         }
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+   }
+
+}
\ No newline at end of file
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageAckScanTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageAckScanTest.java
new file mode 100644
index 0000000..2b907d9
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageAckScanTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PageAckScanTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
Logger.getLogger(PageAckScanTest.class);
+
+   private static final String ADDRESS = "MessagesExpiredPagingTest";
+
+   ActiveMQServer server;
+
+   protected static final int PAGE_MAX = 10 * 1024;
+
+   protected static final int PAGE_SIZE = 1 * 1024;
+
+
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+
+      Configuration config = createDefaultConfig(0, 
true).setJournalSyncNonTransactional(false);
+
+      config.setMessageExpiryScanPeriod(-1);
+      server = createServer(true, config, PAGE_SIZE, PAGE_MAX);
+
+      server.getAddressSettingsRepository().clear();
+
+      AddressSettings defaultSetting = new 
AddressSettings().setPageSizeBytes(PAGE_SIZE).setMaxSizeBytes(PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setAutoCreateAddresses(false).setAutoCreateQueues(false);
+
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+
+      server.start();
+
+
+      server.addAddressInfo(new 
AddressInfo(ADDRESS).addRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new 
QueueConfiguration(ADDRESS).setRoutingType(RoutingType.ANYCAST));
+
+   }
+
+   @Test
+   public void testScanCore() throws Exception {
+      testScan("CORE", 5000, 1000, 100, 1024);
+   }
+
+   @Test
+   public void testScanAMQP() throws Exception {
+      testScan("AMQP", 5000, 1000, 100, 1024);
+   }
+
+
+   public void testScan(String protocol, int numberOfMessages, int 
numberOfMessageSecondWave, int pagingInterval, int bodySize) throws Exception {
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+
+      String extraBody;
+      {
+         StringBuffer buffer = new StringBuffer();
+         for (int i = 0; i < bodySize; i++) {
+            buffer.append("*");
+         }
+         extraBody = buffer.toString();
+      }
+
+      Queue queue = server.locateQueue(ADDRESS);
+      queue.getPagingStore().startPaging();
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
+         MessageProducer producer = session.createProducer(jmsQueue);
+         for (int i = 0; i < 20; i++) {
+            TextMessage message = session.createTextMessage(extraBody);
+            message.setIntProperty("i", i);
+            producer.send(message);
+         }
+         session.commit();
+      }
+
+      queue.forEach((r) -> System.out.println("ref -> " + 
r.getMessage().getIntProperty("i")));
+
+      AtomicInteger errors = new AtomicInteger(0);
+      ReusableLatch latch = new ReusableLatch(4);
+      Runnable done = latch::countDown;
+      Runnable notFound = () -> {
+         errors.incrementAndGet();
+         done.run();
+      };
+      PageSubscription subscription = queue.getPageSubscription();
+      subscription.addScanAck(new CompareI(15), done, notFound);
+      subscription.addScanAck(new CompareI(11), done, notFound);
+      subscription.addScanAck(new CompareI(99), done, notFound);
+      subscription.addScanAck(new CompareI(-30), done, notFound);
+      System.out.println("Performing scan...");
+      subscription.performScanAck();
+      Assert.assertTrue(latch.await(5, TimeUnit.MINUTES));
+      Assert.assertEquals(2, errors.get());
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Queue jmsQueue = session.createQueue(ADDRESS);
+         connection.start();
+         MessageConsumer consumer = session.createConsumer(jmsQueue);
+         for (int i = 0; i < 18; i++) {
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertTrue(message.getIntProperty("i") != 11 && 
message.getIntProperty("i") != 15);
+         }
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+   }
+
+   // Errorprone would barf at this. it was really intended
+   @SuppressWarnings("ComparableType")
+   class CompareI implements Comparable<PagedReference> {
+      final int i;
+      CompareI(int i) {
+         this.i = i;
+      }
+
+      @Override
+      public int compareTo(PagedReference ref) {
+         System.out.println("Comparing against " + 
ref.getMessage().getIntProperty("i"));
+         return ref.getMessage().getIntProperty("i").intValue() - i;
+      }
+   }
+
+}
\ No newline at end of file
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 1199697..5ec2737 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -1017,6 +1017,38 @@
                </execution>
                <execution>
                   <phase>test-compile</phase>
+                  <id>create-brokerConnection-paged-serverA</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <role>amq</role>
+                     <user>artemis</user>
+                     <password>artemis</password>
+                     <allowAnonymous>true</allowAnonymous>
+                     <noWeb>true</noWeb>
+                     
<instance>${basedir}/target/brokerConnect/pagedA</instance>
+                     
<configuration>${basedir}/target/classes/servers/brokerConnect/pagedA</configuration>
+                  </configuration>
+               </execution>
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-brokerConnection-paged-serverB</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <role>amq</role>
+                     <user>artemis</user>
+                     <password>artemis</password>
+                     <allowAnonymous>true</allowAnonymous>
+                     <noWeb>true</noWeb>
+                     
<instance>${basedir}/target/brokerConnect/pagedB</instance>
+                     
<configuration>${basedir}/target/classes/servers/brokerConnect/pagedB</configuration>
+                  </configuration>
+               </execution>
+               <execution>
+                  <phase>test-compile</phase>
                   <id>create-qdr</id>
                   <goals>
                      <goal>create</goal>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedA/broker.xml 
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedA/broker.xml
new file mode 100644
index 0000000..fd6d99e
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedA/broker.xml
@@ -0,0 +1,215 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xmlns:xi="http://www.w3.org/2001/XInclude";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>ServerA</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO, MAPPED, NIO
+           ASYNCIO: Linux Libaio
+           MAPPED: mmap files
+           NIO: Plain Java Files
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>data/paging</paging-directory>
+
+      <bindings-directory>data/bindings</bindings-directory>
+
+      <journal-directory>data/journal</journal-directory>
+
+      <large-messages-directory>data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>10</journal-pool-files>
+
+      <journal-device-block-size>4096</journal-device-block-size>
+
+      <journal-file-size>10M</journal-file-size>
+
+      <!--
+       This value was determined through a calculation.
+       Your system could perform 25 writes per millisecond
+       on the current journal configuration.
+       That translates as a sync write every 40000 nanoseconds.
+
+       Note: If you specify 0 the system will perform writes directly to the 
disk.
+             We recommend this to be 0 if you are using journalType=MAPPED and 
journal-datasync=false.
+      -->
+      <journal-buffer-timeout>40000</journal-buffer-timeout>
+
+
+      <!--
+        When using ASYNCIO, this will determine the writing queue depth for 
libaio.
+       -->
+      <journal-max-io>1</journal-max-io>
+      <!--
+        You can verify the network health of a particular NIC by specifying 
the <network-check-NIC> element.
+         <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is 
meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially 
visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful 
ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d 
%s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 
%2$s</network-check-ping6-command> -->
+
+
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- should the broker detect dead locks and other issues -->
+      <critical-analyzer>true</critical-analyzer>
+
+      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+      <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+
+      <page-sync-timeout>40000</page-sync-timeout>
+
+
+      <acceptors>
+         <acceptor 
name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
+      </acceptors>
+
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61617" name="outgoing" 
reconnect-attempts="-1" retry-interval="100" user="B" password="B">
+            <mirror durable="true"/>
+         </amqp-connection>
+      </broker-connections>
+
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="amq"/>
+            <permission type="deleteNonDurableQueue" roles="amq"/>
+            <permission type="createDurableQueue" roles="amq"/>
+            <permission type="deleteDurableQueue" roles="amq"/>
+            <permission type="createAddress" roles="amq"/>
+            <permission type="deleteAddress" roles="amq"/>
+            <permission type="consume" roles="amq"/>
+            <permission type="browse" roles="amq"/>
+            <permission type="send" roles="amq"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="amq"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <max-size-bytes>100000</max-size-bytes>
+            <page-size-bytes>10000</page-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ" />
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue" />
+            </anycast>
+         </address>
+         <address name="someQueue">
+            <anycast>
+               <queue name="someQueue" />
+            </anycast>
+         </address>
+      </addresses>
+
+
+      <!-- Uncomment the following if you want to use the Standard 
LoggingActiveMQServerPlugin pluging to log in events
+      <broker-plugins>
+         <broker-plugin 
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+            <property key="LOG_ALL_EVENTS" value="true"/>
+            <property key="LOG_CONNECTION_EVENTS" value="true"/>
+            <property key="LOG_SESSION_EVENTS" value="true"/>
+            <property key="LOG_CONSUMER_EVENTS" value="true"/>
+            <property key="LOG_DELIVERING_EVENTS" value="true"/>
+            <property key="LOG_SENDING_EVENTS" value="true"/>
+            <property key="LOG_INTERNAL_EVENTS" value="true"/>
+         </broker-plugin>
+      </broker-plugins>
+      -->
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedA/logging.properties
 
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedA/logging.properties
new file mode 100644
index 0000000..810329f
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedA/logging.properties
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Additional logger names to configure (root logger is always configured)
+# Root logger option
+loggers=org.eclipse.jetty,org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.utils.critical,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms.server,org.apache.activemq.artemis.integration.bootstrap,org.apache.activemq.audit.base,org.apache.activemq.audit.message,org.apache.activemq.audit.resource,org.apache.activemq.artemis.utils.pools,org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirro
 [...]
+
+# Special logger to debug mirror Security
+
+# Root logger level
+logger.level=INFO
+# ActiveMQ Artemis logger levels
+
+# These levels are candidates to eventually debug
+logger.org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.level=INFO
+
+# if you have issues with CriticalAnalyzer, setting this as TRACE would give 
you extra troubleshooting information.
+# but do not use it regularly as it would incur in some extra CPU usage for 
this diagnostic.
+logger.org.apache.activemq.artemis.utils.critical.level=INFO
+
+logger.org.eclipse.jetty.level=WARN
+# Root logger handlers
+logger.handlers=FILE,CONSOLE
+
+# to enable audit change the level to INFO
+logger.org.apache.activemq.audit.base.level=ERROR
+logger.org.apache.activemq.audit.base.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.base.useParentHandlers=false
+
+logger.org.apache.activemq.audit.resource.level=ERROR
+logger.org.apache.activemq.audit.resource.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.resource.useParentHandlers=false
+
+logger.org.apache.activemq.audit.message.level=ERROR
+logger.org.apache.activemq.audit.message.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.message.useParentHandlers=false
+
+# Console handler configuration
+handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
+handler.CONSOLE.properties=autoFlush
+handler.CONSOLE.level=TRACE
+handler.CONSOLE.autoFlush=true
+handler.CONSOLE.formatter=PATTERN
+
+# File handler configuration
+handler.FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
+handler.FILE.level=TRACE
+handler.FILE.properties=suffix,append,autoFlush,fileName
+handler.FILE.suffix=.yyyy-MM-dd
+handler.FILE.append=true
+handler.FILE.autoFlush=true
+handler.FILE.fileName=${artemis.instance}/log/artemis.log
+handler.FILE.formatter=PATTERN
+
+# Formatter pattern configuration
+formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
+formatter.PATTERN.properties=pattern
+formatter.PATTERN.pattern=[%t] %d %-5p [%c] %s%E%n
+
+#Audit logger
+handler.AUDIT_FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
+handler.AUDIT_FILE.level=INFO
+handler.AUDIT_FILE.properties=suffix,append,autoFlush,fileName
+handler.AUDIT_FILE.suffix=.yyyy-MM-dd
+handler.AUDIT_FILE.append=true
+handler.AUDIT_FILE.autoFlush=true
+handler.AUDIT_FILE.fileName=${artemis.instance}/log/audit.log
+handler.AUDIT_FILE.formatter=AUDIT_PATTERN
+
+formatter.AUDIT_PATTERN=org.jboss.logmanager.formatters.PatternFormatter
+formatter.AUDIT_PATTERN.properties=pattern
+formatter.AUDIT_PATTERN.pattern=%d [AUDIT](%t) %s%E%n
diff --git 
a/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedB/broker.xml 
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedB/broker.xml
new file mode 100644
index 0000000..c380d38
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedB/broker.xml
@@ -0,0 +1,215 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xmlns:xi="http://www.w3.org/2001/XInclude";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>ServerB</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO, MAPPED, NIO
+           ASYNCIO: Linux Libaio
+           MAPPED: mmap files
+           NIO: Plain Java Files
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>data/paging</paging-directory>
+
+      <bindings-directory>data/bindings</bindings-directory>
+
+      <journal-directory>data/journal</journal-directory>
+
+      <large-messages-directory>data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>10</journal-pool-files>
+
+      <journal-device-block-size>4096</journal-device-block-size>
+
+      <journal-file-size>10M</journal-file-size>
+
+      <!--
+       This value was determined through a calculation.
+       Your system could perform 25 writes per millisecond
+       on the current journal configuration.
+       That translates as a sync write every 40000 nanoseconds.
+
+       Note: If you specify 0 the system will perform writes directly to the 
disk.
+             We recommend this to be 0 if you are using journalType=MAPPED and 
journal-datasync=false.
+      -->
+      <journal-buffer-timeout>40000</journal-buffer-timeout>
+
+
+      <!--
+        When using ASYNCIO, this will determine the writing queue depth for 
libaio.
+       -->
+      <journal-max-io>1</journal-max-io>
+      <!--
+        You can verify the network health of a particular NIC by specifying 
the <network-check-NIC> element.
+         <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is 
meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially 
visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful 
ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d 
%s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 
%2$s</network-check-ping6-command> -->
+
+
+
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- should the broker detect dead locks and other issues -->
+      <critical-analyzer>true</critical-analyzer>
+
+      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+      <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+
+      <page-sync-timeout>40000</page-sync-timeout>
+
+
+      <acceptors>
+         <acceptor 
name="artemis">tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
+      </acceptors>
+
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61616" name="serverA" 
user="artemis" password="artemis" retry-interval="100" reconnect-attempts="-1">
+            <mirror durable="true"/>
+         </amqp-connection>
+      </broker-connections>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="amq"/>
+            <permission type="deleteNonDurableQueue" roles="amq"/>
+            <permission type="createDurableQueue" roles="amq"/>
+            <permission type="deleteDurableQueue" roles="amq"/>
+            <permission type="createAddress" roles="amq"/>
+            <permission type="deleteAddress" roles="amq"/>
+            <permission type="consume" roles="amq"/>
+            <permission type="browse" roles="amq"/>
+            <permission type="send" roles="amq"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="amq"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <max-size-bytes>100000</max-size-bytes>
+            <page-size-bytes>10000</page-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ" />
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue" />
+            </anycast>
+         </address>
+         <address name="someQueue">
+            <anycast>
+               <queue name="someQueue" />
+            </anycast>
+         </address>
+      </addresses>
+
+
+      <!-- Uncomment the following if you want to use the Standard 
LoggingActiveMQServerPlugin pluging to log in events
+      <broker-plugins>
+         <broker-plugin 
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+            <property key="LOG_ALL_EVENTS" value="true"/>
+            <property key="LOG_CONNECTION_EVENTS" value="true"/>
+            <property key="LOG_SESSION_EVENTS" value="true"/>
+            <property key="LOG_CONSUMER_EVENTS" value="true"/>
+            <property key="LOG_DELIVERING_EVENTS" value="true"/>
+            <property key="LOG_SENDING_EVENTS" value="true"/>
+            <property key="LOG_INTERNAL_EVENTS" value="true"/>
+         </broker-plugin>
+      </broker-plugins>
+      -->
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedB/logging.properties
 
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedB/logging.properties
new file mode 100644
index 0000000..810329f
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/brokerConnect/pagedB/logging.properties
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Additional logger names to configure (root logger is always configured)
+# Root logger option
+loggers=org.eclipse.jetty,org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.utils.critical,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms.server,org.apache.activemq.artemis.integration.bootstrap,org.apache.activemq.audit.base,org.apache.activemq.audit.message,org.apache.activemq.audit.resource,org.apache.activemq.artemis.utils.pools,org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirro
 [...]
+
+# Special logger to debug mirror Security
+
+# Root logger level
+logger.level=INFO
+# ActiveMQ Artemis logger levels
+
+# These levels are candidates to eventually debug
+logger.org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerTarget.level=INFO
+
+# if you have issues with CriticalAnalyzer, setting this as TRACE would give 
you extra troubleshooting information.
+# but do not use it regularly as it would incur in some extra CPU usage for 
this diagnostic.
+logger.org.apache.activemq.artemis.utils.critical.level=INFO
+
+logger.org.eclipse.jetty.level=WARN
+# Root logger handlers
+logger.handlers=FILE,CONSOLE
+
+# to enable audit change the level to INFO
+logger.org.apache.activemq.audit.base.level=ERROR
+logger.org.apache.activemq.audit.base.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.base.useParentHandlers=false
+
+logger.org.apache.activemq.audit.resource.level=ERROR
+logger.org.apache.activemq.audit.resource.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.resource.useParentHandlers=false
+
+logger.org.apache.activemq.audit.message.level=ERROR
+logger.org.apache.activemq.audit.message.handlers=AUDIT_FILE
+logger.org.apache.activemq.audit.message.useParentHandlers=false
+
+# Console handler configuration
+handler.CONSOLE=org.jboss.logmanager.handlers.ConsoleHandler
+handler.CONSOLE.properties=autoFlush
+handler.CONSOLE.level=TRACE
+handler.CONSOLE.autoFlush=true
+handler.CONSOLE.formatter=PATTERN
+
+# File handler configuration
+handler.FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
+handler.FILE.level=TRACE
+handler.FILE.properties=suffix,append,autoFlush,fileName
+handler.FILE.suffix=.yyyy-MM-dd
+handler.FILE.append=true
+handler.FILE.autoFlush=true
+handler.FILE.fileName=${artemis.instance}/log/artemis.log
+handler.FILE.formatter=PATTERN
+
+# Formatter pattern configuration
+formatter.PATTERN=org.jboss.logmanager.formatters.PatternFormatter
+formatter.PATTERN.properties=pattern
+formatter.PATTERN.pattern=[%t] %d %-5p [%c] %s%E%n
+
+#Audit logger
+handler.AUDIT_FILE=org.jboss.logmanager.handlers.PeriodicRotatingFileHandler
+handler.AUDIT_FILE.level=INFO
+handler.AUDIT_FILE.properties=suffix,append,autoFlush,fileName
+handler.AUDIT_FILE.suffix=.yyyy-MM-dd
+handler.AUDIT_FILE.append=true
+handler.AUDIT_FILE.autoFlush=true
+handler.AUDIT_FILE.fileName=${artemis.instance}/log/audit.log
+handler.AUDIT_FILE.formatter=AUDIT_PATTERN
+
+formatter.AUDIT_PATTERN=org.jboss.logmanager.formatters.PatternFormatter
+formatter.AUDIT_PATTERN.properties=pattern
+formatter.AUDIT_PATTERN.pattern=%d [AUDIT](%t) %s%E%n
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/PagedMirrorSmokeTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/PagedMirrorSmokeTest.java
new file mode 100644
index 0000000..a65cfcb
--- /dev/null
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/PagedMirrorSmokeTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import 
org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PagedMirrorSmokeTest extends SmokeTestBase {
+
+   // Change this to true to generate a print-data in certain cases on this 
test
+   private static final boolean PRINT_DATA = false;
+
+   private static final Logger logger = 
Logger.getLogger(PagedMirrorSmokeTest.class);
+
+   public static final String SERVER_NAME_A = "brokerConnect/pagedA";
+   public static final String SERVER_NAME_B = "brokerConnect/pagedB";
+
+   Process processB;
+   Process processA;
+
+   @Before
+   public  void beforeClass() throws Exception {
+      cleanupData(SERVER_NAME_A);
+      cleanupData(SERVER_NAME_B);
+      processB = startServer(SERVER_NAME_B, 1, 0);
+      processA = startServer(SERVER_NAME_A, 0, 0);
+
+      ServerUtil.waitForServerToStart(1, "B", "B", 30000);
+      ServerUtil.waitForServerToStart(0, "A", "A", 30000);
+   }
+
+   @Test
+   public void testPaged() throws Throwable {
+      String sendURI = "tcp://localhost:61616";
+      String consumeURI = "tcp://localhost:61616";
+      String secondConsumeURI = "tcp://localhost:61617";
+
+      File countJournalLocation = new File(getServerLocation(SERVER_NAME_A), 
"data/journal");
+      File countJournalLocationB = new File(getServerLocation(SERVER_NAME_B), 
"data/journal");
+      Assert.assertTrue(countJournalLocation.exists() && 
countJournalLocation.isDirectory());
+      String protocol = "amqp";
+
+      ConnectionFactory sendCF = CFUtil.createConnectionFactory(protocol, 
sendURI);
+      ConnectionFactory consumeCF = CFUtil.createConnectionFactory(protocol, 
consumeURI);
+      ConnectionFactory secondConsumeCF = 
CFUtil.createConnectionFactory(protocol, secondConsumeURI);
+
+      String bodyBuffer;
+      {
+         StringBuffer buffer = new StringBuffer();
+         for (int i = 0; i < 1024; i++) {
+            buffer.append("*");
+         }
+         bodyBuffer = buffer.toString();
+      }
+
+      int NUMBER_OF_MESSAGES = 200;
+      int ACK_I = 77;
+
+      try (Connection sendConnecton = sendCF.createConnection()) {
+         Session sendSession = sendConnecton.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue jmsQueue = sendSession.createQueue("someQueue");
+         MessageProducer producer = sendSession.createProducer(jmsQueue);
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            TextMessage message = sendSession.createTextMessage(bodyBuffer);
+            message.setIntProperty("i", i);
+            producer.send(message);
+         }
+         sendSession.commit();
+      }
+
+      Thread.sleep(500);
+      try (Connection consumeConnection = consumeCF.createConnection()) {
+         Session consumeSession = consumeConnection.createSession(false, 101); 
// individual ack
+         Queue jmsQueue = consumeSession.createQueue("someQueue");
+         MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
+         consumeConnection.start();
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            TextMessage message = (TextMessage) consumer.receive(6000);
+            if (message.getIntProperty("i") == ACK_I) {
+               message.acknowledge();
+            }
+         }
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+      Wait.assertEquals(1, () -> acksCount(countJournalLocation), 5000, 1000);
+      Wait.assertEquals(1, () -> acksCount(countJournalLocationB), 5000, 1000);
+
+      try (Connection consumeConnection = secondConsumeCF.createConnection()) {
+         Session consumeSession = consumeConnection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Queue jmsQueue = consumeSession.createQueue("someQueue");
+         MessageConsumer consumer = consumeSession.createConsumer(jmsQueue);
+         consumeConnection.start();
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES - 1; i++) {
+            TextMessage message = (TextMessage) consumer.receive(6000);
+            Assert.assertNotNull(message);
+            Assert.assertNotEquals(ACK_I, message.getIntProperty("i"));
+         }
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+   }
+
+   private int acksCount(File countJournalLocation) throws Exception {
+      HashMap<Integer, AtomicInteger> countJournal = 
countJournal(countJournalLocation, 10485760, 2, 2);
+      AtomicInteger acksCount = 
countJournal.get((int)JournalRecordIds.ACKNOWLEDGE_CURSOR);
+      return acksCount != null ? acksCount.get() : 0;
+   }
+
+}

Reply via email to